Coverage for model_workflow/analyses/markov.py: 16%

58 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 10:54 +0000

1import mdtraj as mdt 

2 

3from model_workflow.tools.get_screenshot import get_screenshot 

4from model_workflow.utils.auxiliar import save_json 

5from model_workflow.utils.constants import OUTPUT_MARKOV_FILENAME, PROTEIN_AND_NUCLEIC 

6from model_workflow.utils.type_hints import * 

7 

8def markov ( 

9 structure_file : 'File', 

10 trajectory_file : 'File', 

11 output_directory : str, 

12 structure : 'Structure', 

13 populations : List[float], 

14 rmsd_selection : str = PROTEIN_AND_NUCLEIC, 

15 nodes_number : int = 20, 

16): 

17 """Set the data needed to represent a Markov State Model graph in the client. 

18 This is finding the most populated frames and calculating an RMSD matrix between these frames.""" 

19 

20 # If there is no populations then we stop here 

21 if not populations or len(populations) == 0: 

22 print(' There are no populations') 

23 return 

24 

25 # Set the main output filepath 

26 output_analysis_filepath = f'{output_directory}/{OUTPUT_MARKOV_FILENAME}' 

27 

28 # Parse the RMSD selection in VMD selection syntax 

29 parsed_selection = structure.select(rmsd_selection, syntax='vmd') 

30 # If there is nothing to check then warn the user and stop here 

31 if not parsed_selection: 

32 raise SystemExit('There are not atoms to be analyzed for the RMSD matrix') 

33 

34 # Get the numbers of frames with highest populations 

35 population_per_frames = [ (population, frame) for frame, population in enumerate(populations) ] 

36 highest_populations = [] 

37 highest_population_frames = [] 

38 for population, frame in sorted(population_per_frames, reverse=True)[0:nodes_number]: 

39 highest_populations.append(population) 

40 highest_population_frames.append(frame) 

41 print(' Reading most populated frames in trajectory') 

42 # Read the trajectory frame by frame looking for the specified frames 

43 trajectory = mdt.iterload(trajectory_file.path, top=structure_file.path, chunk=1) 

44 # Set a generator for the frames to be selected once sorted 

45 selected_frames = iter(sorted(highest_population_frames)) 

46 next_frame = next(selected_frames) 

47 # Conserve only the desired frames 

48 frame_coordinates = {} 

49 for frame_number, frame in enumerate(trajectory): 

50 # Update the current frame log 

51 print(f' Frame {frame_number}', end='\r') 

52 # Skip the current frame if we do not need it 

53 if frame_number != next_frame: 

54 continue 

55 # Save it otherwise 

56 frame_coordinates[frame_number] = frame 

57 # Update the next frame 

58 next_frame = next(selected_frames, None) 

59 if next_frame == None: 

60 break 

61 print(' Calculating RMSD matrix') 

62 # Calculate the RMSD matrix between the selected frames 

63 rmsd_matrix = [] 

64 for x, frame in enumerate(highest_population_frames): 

65 rmsd_row = [] 

66 for y, other_frame in enumerate(highest_population_frames): 

67 # RMSD of any frame against itself is 0 

68 if x == y: 

69 rmsd_row.append(0) 

70 continue 

71 # If RMSD was previously calculated in the opposite way then copy the value 

72 if x > y: 

73 rmsd_row.append(rmsd_matrix[y][x]) 

74 continue 

75 # Otherwise calculate RMSD between the asked frames 

76 rsmd = mdt.rmsd(frame_coordinates[frame], frame_coordinates[other_frame], atom_indices=parsed_selection.atom_indices)[0] 

77 rmsd_row.append(float(rsmd)) 

78 rmsd_matrix.append(rmsd_row) 

79 # Make a copy of the structure to avoid mutating the original structure 

80 reference_structure = structure.copy() 

81 print(' Taking screenshots of selected frames') 

82 frame_count = str(len(frame_coordinates)) 

83 # Save the screenshot parameters so we can keep images coherent between states 

84 screenshot_parameters = None 

85 # For each frame coordinates, generate PDB file, take a scrrenshot and delete it 

86 for i, frame in enumerate(frame_coordinates.values(), 1): 

87 # Update the current frame log 

88 print(f' Screenshot {i}/{frame_count}', end='\r') 

89 # Get the actual coordinates 

90 coordinates = frame.xyz[0] * 10 # We multiply by to restor Ångstroms 

91 # Update the reference structure coordinates 

92 reference_structure.set_new_coordinates(coordinates) 

93 # Set the screenshot filename 

94 screenshot_filename = f'markov_screenshot_{str(i).zfill(2)}.jpg' 

95 # Generate the screenshot 

96 screenshot_parameters = get_screenshot(reference_structure, screenshot_filename, parameters=screenshot_parameters) 

97 # Export the analysis data to a json file 

98 data = { 

99 'frames': highest_population_frames, 

100 'populations': highest_populations, 

101 'rmsd_matrix': rmsd_matrix 

102 } 

103 save_json(data, output_analysis_filepath)