Coverage for model_workflow/tools/get_reduced_trajectory.py: 47%

30 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 10:54 +0000

1from subprocess import run, PIPE, Popen 

2 

3from math import ceil 

4 

5from model_workflow.utils.constants import GROMACS_EXECUTABLE, INCOMPLETE_PREFIX, GREY_HEADER, COLOR_END 

6from model_workflow.utils.type_hints import * 

7 

8def calculate_frame_step(snapshots, reduced_trajectory_frames_limit): 

9 # Calculate the step between frames in the reduced trajectory to match the final number of frames 

10 # WARNING: Since the step must be an integer the thorical step must be rounded 

11 # This means the specified final number of frames may not be accomplished, but it is okey 

12 # WARNING: Since the step is rounded with the math.ceil function it will always be rounded up 

13 # This means the final number of frames will be the specified or less 

14 # CRITICAL WARNING: 

15 # This formula is exactly the same that the client uses to request stepped frames to the API 

16 # This means that the client and the workflow are coordinated and these formulas must not change 

17 # If you decide to change this formula (in both workflow and client)... 

18 # You will have to run again all the database analyses with reduced trajectories 

19 step = ceil(snapshots / reduced_trajectory_frames_limit) 

20 

21 # Calculate also the final number of frames given the current step and return this value 

22 # WARNING: It may seem that the final number of frames is math.floor(snapshots / step) 

23 # HOWEVER, the frame 0 also counts so it would be math.floor() + 1 

24 # HOWEVER, when snapshots / step % 0, the last frame is not returned 

25 # For this reason, the final number of frames is equal to the ceiling of the division 

26 frames = ceil(snapshots / step) 

27 

28 return step, frames 

29 

30# Get a reduced version of the provided trajectory 

31# Frames are taken along the whole trajectory 

32# Several analyses use this function since they use a reduced number of frames to work 

33# The output trajectory filename is set here and returned 

34# This is because reduced trajectory names must be standard, since they are reused 

35# If the reduced trajectory already exists do not build it again but return its filename 

36# In addition returns always the step and the expected final number of frames 

37def get_reduced_trajectory ( 

38 input_topology_file : 'File', 

39 input_trajectory_file : 'File', 

40 snapshots : int, 

41 reduced_trajectory_frames_limit : int, 

42 ) -> tuple: 

43 

44 # If the trajectory already has the reduced number of frames or less return here 

45 if reduced_trajectory_frames_limit >= snapshots: 

46 output_trajectory_filepath = input_trajectory_file.path 

47 step = 1 

48 frames = snapshots 

49 return output_trajectory_filepath, step, frames 

50 

51 # Set the reduced trajectory filename 

52 output_trajectory_filename = f'f{reduced_trajectory_frames_limit}.trajectory.xtc' 

53 # Set the output trajectory file in the same directory than the input trajectory 

54 output_trajectory_file = input_trajectory_file.get_neighbour_file(output_trajectory_filename) 

55 

56 # Set the incomplete reduced trajectory file 

57 # This prevents the workflow from using an incomplete reduced trajectroy in case the workflow was suddenly interrupted 

58 incomplete_trajectory_file = input_trajectory_file.get_prefixed_file(INCOMPLETE_PREFIX) 

59 

60 # Calculate the step between frames in the reduced trajectory to match the final number of frames 

61 step, frames = calculate_frame_step(snapshots, reduced_trajectory_frames_limit) 

62 

63 # Create the reduced trajectory if it does not exist yet 

64 if not output_trajectory_file.exists: 

65 print(f'Reducing trajectory from {snapshots} to less than {reduced_trajectory_frames_limit} frames') 

66 print(GREY_HEADER) 

67 # Run Gromacs 

68 p = Popen([ 

69 "echo", 

70 "System", 

71 ], stdout=PIPE) 

72 logs = run([ 

73 GROMACS_EXECUTABLE, 

74 "trjconv", 

75 "-s", 

76 input_topology_file.path, 

77 "-f", 

78 input_trajectory_file.path, 

79 '-o', 

80 incomplete_trajectory_file.path, 

81 '-skip', 

82 str(step), 

83 '-quiet' 

84 ], stdin=p.stdout, stdout=PIPE).stdout.decode() 

85 p.stdout.close() 

86 print(COLOR_END) 

87 # Check the output file exists at the end 

88 if not incomplete_trajectory_file.exists: 

89 print(logs) 

90 raise SystemExit('Something went wrong with GROMACS while reducing the trajectory') 

91 # Once the trajectory is complete we rename it as complete 

92 incomplete_trajectory_file.rename_to(output_trajectory_file) 

93 

94 

95 

96 # Return gromacs logs 

97 return output_trajectory_file.path, step, frames