Coverage for mddb_workflow/tools/get_reduced_trajectory.py: 61%
23 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 15:48 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 15:48 +0000
1from math import ceil
3from mddb_workflow.utils.constants import INCOMPLETE_PREFIX
4from mddb_workflow.utils.gmx_spells import run_gromacs
5from mddb_workflow.utils.type_hints import *
7def calculate_frame_step(snapshots: int, reduced_trajectory_frames_limit: int) -> tuple[int, int]:
8 """
9 Calculate the step between frames in the reduced trajectory to match the final number of frames.
10 Also calculate the final number of frames given the current step.
12 Returns:
13 tuple:
14 - step (int): The step between frames in the reduced trajectory.
15 - frames (int): The final number of frames in the reduced trajectory.
16 """
17 # WARNING: Since the step must be an integer the thorical step must be rounded
18 # This means the specified final number of frames may not be accomplished, but it is okey
19 # WARNING: Since the step is rounded with the math.ceil function it will always be rounded up
20 # This means the final number of frames will be the specified or less
21 # CRITICAL WARNING:
22 # This formula is exactly the same that the client uses to request stepped frames to the API
23 # This means that the client and the workflow are coordinated and these formulas must not change
24 # If you decide to change this formula (in both workflow and client)...
25 # You will have to run again all the database analyses with reduced trajectories
26 step = ceil(snapshots / reduced_trajectory_frames_limit)
28 # Calculate also the final number of frames given the current step and return this value
29 # WARNING: It may seem that the final number of frames is math.floor(snapshots / step)
30 # HOWEVER, the frame 0 also counts so it would be math.floor() + 1
31 # HOWEVER, when snapshots / step % 0, the last frame is not returned
32 # For this reason, the final number of frames is equal to the ceiling of the division
33 frames = ceil(snapshots / step)
35 return step, frames
37# Get a reduced version of the provided trajectory
38# Frames are taken along the whole trajectory
39# Several analyses use this function since they use a reduced number of frames to work
40# The output trajectory filename is set here and returned
41# This is because reduced trajectory names must be standard, since they are reused
42# If the reduced trajectory already exists do not build it again but return its filename
43# In addition returns always the step and the expected final number of frames
44def get_reduced_trajectory (
45 input_topology_file : 'File',
46 input_trajectory_file : 'File',
47 snapshots : int,
48 reduced_trajectory_frames_limit : int,
49 ) -> tuple:
51 # If the trajectory already has the reduced number of frames or less return here
52 if reduced_trajectory_frames_limit >= snapshots:
53 output_trajectory_filepath = input_trajectory_file.path
54 step = 1
55 frames = snapshots
56 return output_trajectory_filepath, step, frames
58 # Set the reduced trajectory filename
59 output_trajectory_filename = f'f{reduced_trajectory_frames_limit}.trajectory.xtc'
60 # Set the output trajectory file in the same directory than the input trajectory
61 output_trajectory_file = input_trajectory_file.get_neighbour_file(output_trajectory_filename)
63 # Set the incomplete reduced trajectory file
64 # This prevents the workflow from using an incomplete reduced trajectroy in case the workflow was suddenly interrupted
65 incomplete_trajectory_file = input_trajectory_file.get_prefixed_file(INCOMPLETE_PREFIX)
67 # Calculate the step between frames in the reduced trajectory to match the final number of frames
68 step, frames = calculate_frame_step(snapshots, reduced_trajectory_frames_limit)
70 # Create the reduced trajectory if it does not exist yet
71 if not output_trajectory_file.exists:
72 print(f'Reducing trajectory from {snapshots} to less than {reduced_trajectory_frames_limit} frames')
73 # Run Gromacs
74 run_gromacs(f'trjconv -s {input_topology_file.path} -f {input_trajectory_file.path} \
75 -o {incomplete_trajectory_file.path} -skip {step}',
76 user_input = 'System')
77 # Once the trajectory is complete we rename it as complete
78 incomplete_trajectory_file.rename_to(output_trajectory_file)
80 # Return gromacs logs
81 return output_trajectory_file.path, step, frames