Coverage for mddb_workflow / analyses / markov.py: 17%
60 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 18:45 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 18:45 +0000
1import mdtraj as mdt
3from mddb_workflow.tools.get_screenshot import get_screenshot
4from mddb_workflow.utils.auxiliar import save_json
5from mddb_workflow.utils.constants import OUTPUT_MARKOV_FILENAME, PROTEIN_AND_NUCLEIC
6from mddb_workflow.utils.file import File
7from mddb_workflow.utils.type_hints import *
9def markov (
10 structure_file : 'File',
11 trajectory_file : 'File',
12 output_directory : str,
13 structure : 'Structure',
14 populations : list[float],
15 rmsd_selection : str = PROTEIN_AND_NUCLEIC,
16 nodes_number : int = 20,
17):
18 """Set the data needed to represent a Markov State Model graph in the client.
19 This is finding the most populated frames and calculating an RMSD matrix between these frames."""
21 # If there is no populations then we stop here
22 if not populations or len(populations) == 0:
23 print(' There are no populations')
24 return
26 # Set the main output filepath
27 output_analysis_filepath = f'{output_directory}/{OUTPUT_MARKOV_FILENAME}'
29 # Parse the RMSD selection in VMD selection syntax
30 parsed_selection = structure.select(rmsd_selection, syntax='vmd')
31 # If there is nothing to check then warn the user and stop here
32 if not parsed_selection:
33 raise RuntimeError('There are not atoms to be analyzed for the RMSD matrix')
35 # Get the numbers of frames with highest populations
36 population_per_frames = [ (population, frame) for frame, population in enumerate(populations) ]
37 highest_populations = []
38 highest_population_frames = []
39 for population, frame in sorted(population_per_frames, reverse=True)[0:nodes_number]:
40 highest_populations.append(population)
41 highest_population_frames.append(frame)
42 print(' Reading most populated frames in trajectory')
43 # Read the trajectory frame by frame looking for the specified frames
44 trajectory = mdt.iterload(trajectory_file.path, top=structure_file.path, chunk=1)
45 # Set a generator for the frames to be selected once sorted
46 selected_frames = iter(sorted(highest_population_frames))
47 next_frame = next(selected_frames)
48 # Conserve only the desired frames
49 frame_coordinates = {}
50 for frame_number, frame in enumerate(trajectory):
51 # Update the current frame log
52 print(f' Frame {frame_number}', end='\r')
53 # Skip the current frame if we do not need it
54 if frame_number != next_frame:
55 continue
56 # Save it otherwise
57 frame_coordinates[frame_number] = frame
58 # Update the next frame
59 next_frame = next(selected_frames, None)
60 if next_frame == None:
61 break
62 print(' Calculating RMSD matrix')
63 # Calculate the RMSD matrix between the selected frames
64 rmsd_matrix = []
65 for x, frame in enumerate(highest_population_frames):
66 rmsd_row = []
67 for y, other_frame in enumerate(highest_population_frames):
68 # RMSD of any frame against itself is 0
69 if x == y:
70 rmsd_row.append(0)
71 continue
72 # If RMSD was previously calculated in the opposite way then copy the value
73 if x > y:
74 rmsd_row.append(rmsd_matrix[y][x])
75 continue
76 # Otherwise calculate RMSD between the asked frames
77 rsmd = mdt.rmsd(frame_coordinates[frame], frame_coordinates[other_frame], atom_indices=parsed_selection.atom_indices)[0]
78 rmsd_row.append(float(rsmd))
79 rmsd_matrix.append(rmsd_row)
80 # Make a copy of the structure to avoid mutating the original structure
81 reference_structure = structure.copy()
82 print(' Taking screenshots of selected frames')
83 frame_count = str(len(frame_coordinates))
84 # Save the screenshot parameters so we can keep images coherent between states
85 screenshot_parameters = None
86 # For each frame coordinates, generate PDB file, take a scrrenshot and delete it
87 for i, frame in enumerate(frame_coordinates.values(), 1):
88 # Update the current frame log
89 print(f' Screenshot {i}/{frame_count}', end='\r')
90 # Get the actual coordinates
91 coordinates = frame.xyz[0] * 10 # We multiply by to restor Ångstroms
92 # Update the reference structure coordinates
93 reference_structure.set_new_coordinates(coordinates)
94 # Set the screenshot filename
95 screenshot_filepath = f'{output_directory}/markov_screenshot_{str(i).zfill(2)}.jpg'
96 screenshot_file = File(screenshot_filepath)
97 # Generate the screenshot
98 screenshot_parameters = get_screenshot(reference_structure, screenshot_file, parameters=screenshot_parameters)
99 # Export the analysis data to a json file
100 data = {
101 'frames': highest_population_frames,
102 'populations': highest_populations,
103 'rmsd_matrix': rmsd_matrix
104 }
105 save_json(data, output_analysis_filepath)