Coverage for mddb_workflow/tools/get_reduced_trajectory.py: 61%

23 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-29 15:48 +0000

1from math import ceil 

2 

3from mddb_workflow.utils.constants import INCOMPLETE_PREFIX 

4from mddb_workflow.utils.gmx_spells import run_gromacs 

5from mddb_workflow.utils.type_hints import * 

6 

7def calculate_frame_step(snapshots: int, reduced_trajectory_frames_limit: int) -> tuple[int, int]: 

8 """ 

9 Calculate the step between frames in the reduced trajectory to match the final number of frames. 

10 Also calculate the final number of frames given the current step. 

11 

12 Returns: 

13 tuple: 

14 - step (int): The step between frames in the reduced trajectory. 

15 - frames (int): The final number of frames in the reduced trajectory. 

16 """ 

17 # WARNING: Since the step must be an integer the thorical step must be rounded 

18 # This means the specified final number of frames may not be accomplished, but it is okey 

19 # WARNING: Since the step is rounded with the math.ceil function it will always be rounded up 

20 # This means the final number of frames will be the specified or less 

21 # CRITICAL WARNING: 

22 # This formula is exactly the same that the client uses to request stepped frames to the API 

23 # This means that the client and the workflow are coordinated and these formulas must not change 

24 # If you decide to change this formula (in both workflow and client)... 

25 # You will have to run again all the database analyses with reduced trajectories 

26 step = ceil(snapshots / reduced_trajectory_frames_limit) 

27 

28 # Calculate also the final number of frames given the current step and return this value 

29 # WARNING: It may seem that the final number of frames is math.floor(snapshots / step) 

30 # HOWEVER, the frame 0 also counts so it would be math.floor() + 1 

31 # HOWEVER, when snapshots / step % 0, the last frame is not returned 

32 # For this reason, the final number of frames is equal to the ceiling of the division 

33 frames = ceil(snapshots / step) 

34 

35 return step, frames 

36 

37# Get a reduced version of the provided trajectory 

38# Frames are taken along the whole trajectory 

39# Several analyses use this function since they use a reduced number of frames to work 

40# The output trajectory filename is set here and returned 

41# This is because reduced trajectory names must be standard, since they are reused 

42# If the reduced trajectory already exists do not build it again but return its filename 

43# In addition returns always the step and the expected final number of frames 

44def get_reduced_trajectory ( 

45 input_topology_file : 'File', 

46 input_trajectory_file : 'File', 

47 snapshots : int, 

48 reduced_trajectory_frames_limit : int, 

49 ) -> tuple: 

50 

51 # If the trajectory already has the reduced number of frames or less return here 

52 if reduced_trajectory_frames_limit >= snapshots: 

53 output_trajectory_filepath = input_trajectory_file.path 

54 step = 1 

55 frames = snapshots 

56 return output_trajectory_filepath, step, frames 

57 

58 # Set the reduced trajectory filename 

59 output_trajectory_filename = f'f{reduced_trajectory_frames_limit}.trajectory.xtc' 

60 # Set the output trajectory file in the same directory than the input trajectory 

61 output_trajectory_file = input_trajectory_file.get_neighbour_file(output_trajectory_filename) 

62 

63 # Set the incomplete reduced trajectory file 

64 # This prevents the workflow from using an incomplete reduced trajectroy in case the workflow was suddenly interrupted 

65 incomplete_trajectory_file = input_trajectory_file.get_prefixed_file(INCOMPLETE_PREFIX) 

66 

67 # Calculate the step between frames in the reduced trajectory to match the final number of frames 

68 step, frames = calculate_frame_step(snapshots, reduced_trajectory_frames_limit) 

69 

70 # Create the reduced trajectory if it does not exist yet 

71 if not output_trajectory_file.exists: 

72 print(f'Reducing trajectory from {snapshots} to less than {reduced_trajectory_frames_limit} frames') 

73 # Run Gromacs 

74 run_gromacs(f'trjconv -s {input_topology_file.path} -f {input_trajectory_file.path} \ 

75 -o {incomplete_trajectory_file.path} -skip {step}', 

76 user_input = 'System') 

77 # Once the trajectory is complete we rename it as complete 

78 incomplete_trajectory_file.rename_to(output_trajectory_file) 

79 

80 # Return gromacs logs 

81 return output_trajectory_file.path, step, frames