Coverage for mddb_workflow / analyses / markov.py: 17%

60 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 18:45 +0000

1import mdtraj as mdt 

2 

3from mddb_workflow.tools.get_screenshot import get_screenshot 

4from mddb_workflow.utils.auxiliar import save_json 

5from mddb_workflow.utils.constants import OUTPUT_MARKOV_FILENAME, PROTEIN_AND_NUCLEIC 

6from mddb_workflow.utils.file import File 

7from mddb_workflow.utils.type_hints import * 

8 

9def markov ( 

10 structure_file : 'File', 

11 trajectory_file : 'File', 

12 output_directory : str, 

13 structure : 'Structure', 

14 populations : list[float], 

15 rmsd_selection : str = PROTEIN_AND_NUCLEIC, 

16 nodes_number : int = 20, 

17): 

18 """Set the data needed to represent a Markov State Model graph in the client. 

19 This is finding the most populated frames and calculating an RMSD matrix between these frames.""" 

20 

21 # If there is no populations then we stop here 

22 if not populations or len(populations) == 0: 

23 print(' There are no populations') 

24 return 

25 

26 # Set the main output filepath 

27 output_analysis_filepath = f'{output_directory}/{OUTPUT_MARKOV_FILENAME}' 

28 

29 # Parse the RMSD selection in VMD selection syntax 

30 parsed_selection = structure.select(rmsd_selection, syntax='vmd') 

31 # If there is nothing to check then warn the user and stop here 

32 if not parsed_selection: 

33 raise RuntimeError('There are not atoms to be analyzed for the RMSD matrix') 

34 

35 # Get the numbers of frames with highest populations 

36 population_per_frames = [ (population, frame) for frame, population in enumerate(populations) ] 

37 highest_populations = [] 

38 highest_population_frames = [] 

39 for population, frame in sorted(population_per_frames, reverse=True)[0:nodes_number]: 

40 highest_populations.append(population) 

41 highest_population_frames.append(frame) 

42 print(' Reading most populated frames in trajectory') 

43 # Read the trajectory frame by frame looking for the specified frames 

44 trajectory = mdt.iterload(trajectory_file.path, top=structure_file.path, chunk=1) 

45 # Set a generator for the frames to be selected once sorted 

46 selected_frames = iter(sorted(highest_population_frames)) 

47 next_frame = next(selected_frames) 

48 # Conserve only the desired frames 

49 frame_coordinates = {} 

50 for frame_number, frame in enumerate(trajectory): 

51 # Update the current frame log 

52 print(f' Frame {frame_number}', end='\r') 

53 # Skip the current frame if we do not need it 

54 if frame_number != next_frame: 

55 continue 

56 # Save it otherwise 

57 frame_coordinates[frame_number] = frame 

58 # Update the next frame 

59 next_frame = next(selected_frames, None) 

60 if next_frame == None: 

61 break 

62 print(' Calculating RMSD matrix') 

63 # Calculate the RMSD matrix between the selected frames 

64 rmsd_matrix = [] 

65 for x, frame in enumerate(highest_population_frames): 

66 rmsd_row = [] 

67 for y, other_frame in enumerate(highest_population_frames): 

68 # RMSD of any frame against itself is 0 

69 if x == y: 

70 rmsd_row.append(0) 

71 continue 

72 # If RMSD was previously calculated in the opposite way then copy the value 

73 if x > y: 

74 rmsd_row.append(rmsd_matrix[y][x]) 

75 continue 

76 # Otherwise calculate RMSD between the asked frames 

77 rsmd = mdt.rmsd(frame_coordinates[frame], frame_coordinates[other_frame], atom_indices=parsed_selection.atom_indices)[0] 

78 rmsd_row.append(float(rsmd)) 

79 rmsd_matrix.append(rmsd_row) 

80 # Make a copy of the structure to avoid mutating the original structure 

81 reference_structure = structure.copy() 

82 print(' Taking screenshots of selected frames') 

83 frame_count = str(len(frame_coordinates)) 

84 # Save the screenshot parameters so we can keep images coherent between states 

85 screenshot_parameters = None 

86 # For each frame coordinates, generate PDB file, take a scrrenshot and delete it 

87 for i, frame in enumerate(frame_coordinates.values(), 1): 

88 # Update the current frame log 

89 print(f' Screenshot {i}/{frame_count}', end='\r') 

90 # Get the actual coordinates 

91 coordinates = frame.xyz[0] * 10 # We multiply by to restor Ångstroms 

92 # Update the reference structure coordinates 

93 reference_structure.set_new_coordinates(coordinates) 

94 # Set the screenshot filename 

95 screenshot_filepath = f'{output_directory}/markov_screenshot_{str(i).zfill(2)}.jpg' 

96 screenshot_file = File(screenshot_filepath) 

97 # Generate the screenshot 

98 screenshot_parameters = get_screenshot(reference_structure, screenshot_file, parameters=screenshot_parameters) 

99 # Export the analysis data to a json file 

100 data = { 

101 'frames': highest_population_frames, 

102 'populations': highest_populations, 

103 'rmsd_matrix': rmsd_matrix 

104 } 

105 save_json(data, output_analysis_filepath)