Coverage for model_workflow/utils/mdt_spells.py: 21%
158 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
1import os
2from os.path import exists, getsize
3from subprocess import run, PIPE
4from typing import List
6import mdtraj as mdt
7import numpy as np
9from model_workflow.utils.file import File
10from model_workflow.utils.gmx_spells import merge_xtc_files
11from model_workflow.utils.constants import GREY_HEADER, COLOR_END
13CURSOR_UP_ONE = '\x1b[1A'
14ERASE_LINE = '\x1b[2K'
15ERASE_PREVIOUS_LINE = CURSOR_UP_ONE + ERASE_LINE + CURSOR_UP_ONE
17# Set mdtraj supported formats
18mdtraj_supported_structure_formats = {
19 'pdb', 'pdb.gz' 'h5', 'lh5', 'prmtop', 'parm7', 'prm7', 'psf', 'mol2', 'hoomdxml', 'gro', 'arc', 'hdf5', 'gsd'
20}
21mdtraj_supported_trajectory_formats = {'dcd', 'xtc', 'trr', 'nc', 'h5', 'binpos', 'mdcrd', 'xyz', 'pdb'}
23# Use mdtraj 'mdconvert' command-line script (there is no python version for this tool apparently)
24# Multiple files may be selected with bash syntax
25def merge_and_convert_trajectories (
26 input_trajectory_filenames : List[str],
27 output_trajectory_filename : str
28 ):
30 # Run MDtraj
31 print(GREY_HEADER, end='\r')
32 process = run([
33 "mdconvert",
34 "-o",
35 output_trajectory_filename,
36 *input_trajectory_filenames,
37 ], stderr=PIPE)
38 error_logs = process.stderr.decode()
39 print(COLOR_END, end='\r')
41 # If output has not been generated then warn the user
42 if not exists(output_trajectory_filename) or getsize(output_trajectory_filename) == 0:
43 print(error_logs)
44 raise SystemExit('Something went wrong with MDTraj')
46# NEVER FORGET: mdconvert does not handle mdcrd format
47mdconvert_supported_formats = {'dcd', 'xtc', 'trr', 'nc', 'h5', 'binpos'}
48merge_and_convert_trajectories.format_sets = [
49 {
50 'inputs': {
51 'input_trajectory_filenames': mdconvert_supported_formats
52 },
53 'outputs': {
54 'output_trajectory_filename': mdconvert_supported_formats
55 }
56 },
57]
59# Merge and convert trajectories without the mdconvert command
60# WARNING: This process is slow since we must iterate and merge each frame
61# WARNING: This process is restricted to trr/xtc output files given the merger
62def merge_and_convert_trajectories_alternative (
63 input_structure_filename : str,
64 input_trajectory_filenames : List[str],
65 output_trajectory_filename : str
66 ):
68 # Assert we have input values
69 if not input_structure_filename:
70 raise SystemExit('ERROR: Missing input structure filenames')
71 if not input_trajectory_filenames:
72 raise SystemExit('ERROR: Missing input trajectory filenames')
73 if not output_trajectory_filename:
74 raise SystemExit('ERROR: Missing output trajectory filename')
76 # Load the topology, which is used further
77 topology = mdt.load_topology(input_structure_filename)
79 # If the output trajectory filename matches any input file then we must rename the input filename to not overwrite it
80 if output_trajectory_filename in input_trajectory_filenames:
81 backup_filename = 'backup.' + output_trajectory_filename
82 os.rename(output_trajectory_filename, backup_filename)
83 repeated_input_filename_index = input_trajectory_filenames.index(output_trajectory_filename)
84 input_trajectory_filenames[repeated_input_filename_index] = backup_filename
86 # If the output trajectory file already exists at this point then we must stop here
87 # The raw trjcat implementation will not join things to the end of it
88 if exists(output_trajectory_filename):
89 raise SystemExit('The output file already exists and overwrite is not supported for this functionality')
90 # Print an empty line for the first 'ERASE_PREVIOUS_LINE' to not delete a previous log
91 print()
92 # Iterate over the different input trajectory filenames
93 frame_filename = '.frame.xtc'
94 for input_trajectory_filename in input_trajectory_filenames:
95 # Load the trajectory frame by frame
96 trajectory = mdt.iterload(input_trajectory_filename, top=input_structure_filename, chunk=1)
97 for f, frame in enumerate(trajectory):
98 # Update the current frame log
99 print(ERASE_PREVIOUS_LINE)
100 print('Frame ' + str(f))
101 frame.save(frame_filename)
102 # Join current frame to the output trajectory
103 merge_xtc_files(output_trajectory_filename, frame_filename)
104 # Remove the residual file
105 # WARNING: It may not exist if the trajectory has 1 frame
106 if exists(frame_filename):
107 os.remove(frame_filename)
108merge_and_convert_trajectories_alternative.format_sets = [
109 {
110 'inputs': {
111 'input_structure_filename': mdtraj_supported_structure_formats,
112 'input_trajectory_filenames': mdtraj_supported_trajectory_formats
113 },
114 'outputs': {
115 'output_trajectory_filename': {'xtc', 'trr'}
116 }
117 },
118]
120# Merge and convert trajectories without the mdconvert command
121# WARNING: Note that this process is not memory efficient so beware the size of trajectories to be converted
122# DEPRECATED: This function was meant to convert trajectories to mdcrd, which is not supported by mdconvert
123# DEPRECATED: However the export to mdcrd in mdtraj does not allow to remove the periodic box, which may be a problem
124# DEPRECTAED: Use VMD instead
125def merge_and_convert_trajectories_unefficient (
126 input_structure_filename : str,
127 input_trajectory_filenames : List[str],
128 output_trajectory_filename : str,
129):
130 print('WARNING: You are using a not memory efficient tool. If the trajectory is too big your system may not hold it.')
131 # Load all trajectories together
132 sample_trajectory = input_trajectory_filenames[0]
133 if input_structure_filename:
134 trajectory = mdt.load(sample_trajectory, top=input_structure_filename)
135 else:
136 trajectory = mdt.load(sample_trajectory)
137 for extra_trajectory in input_trajectory_filenames[1:]:
138 if input_structure_filename:
139 extra = mdt.load(sample_trajectory, top=input_structure_filename)
140 else:
141 extra = mdt.load(sample_trajectory)
142 trajectory = mdt.join([trajectory, extra], check_topology=False)
144 # Write the whole trajectory
145 trajectory.save(output_trajectory_filename)
147merge_and_convert_trajectories_unefficient.format_sets = [
148 {
149 'inputs': {
150 'input_structure_filename': mdtraj_supported_structure_formats,
151 'input_trajectory_filenames': mdtraj_supported_trajectory_formats
152 },
153 'outputs': {
154 'output_trajectory_filename': mdtraj_supported_trajectory_formats
155 }
156 },
157]
159# Get specific frames from a trajectory
160# WARNING: This function is time unefficient
161def get_trajectory_subset (
162 input_structure_filename : str,
163 input_trajectory_filename : str,
164 output_trajectory_filename : str,
165 start : int = 0,
166 end : int = None,
167 step : int = 1,
168 frames : List[int] = [],
169 skip : List[int] = [],
170):
172 # Load the trajectory frame by frame and get only the desired frames
173 if input_structure_filename:
174 trajectory = mdt.iterload(input_trajectory_filename, top=input_structure_filename, chunk=1)
175 else:
176 trajectory = mdt.iterload(input_trajectory_filename, chunk=1)
177 # Set the reduced trajectory to be returned
178 reduced_trajectory = None
179 # Specific frames scenario
180 # DANI: No se ha provado
181 if frames and len(frames) > 0:
182 # Get the first and the last frames
183 sorted_frames = sorted(frames)
184 first_frame = sorted_frames[0]
185 last_frame = sorted_frames[-1]
186 # Get the list of frames as a set to speed up the searches
187 selected_frames = set(frames)
188 # Get the first chunk/frame
189 for frame, chunk in enumerate(trajectory):
190 if frame == first_frame:
191 reduced_trajectory = chunk
192 break
193 # If we have nothing at this point then it means our start is out of the frames range
194 if not reduced_trajectory:
195 raise SystemExit('None of the selected frames are in range of the trajectory frames number')
196 # Get further chunks/frames
197 for frame, chunk in enumerate(trajectory, first_frame + 1):
198 if frame in selected_frames:
199 reduced_trajectory = mdt.join([reduced_trajectory, chunk], check_topology=False)
200 # If this is the last frame then stop here to avoid reading the rest of the trajectory
201 if frame >= last_frame:
202 break
203 # Start / End / Step + Skip scenario
204 else:
205 # Get the first chunk
206 frame_count = 0 # This count works only in case the start frame is out of range, for the logs
207 for i, chunk in enumerate(trajectory):
208 frame_count = i
209 if i == start:
210 reduced_trajectory = chunk
211 break
212 # If we have nothing at this point then it means our start is out of the frames range
213 if not reduced_trajectory:
214 frame_count += 1
215 raise SystemExit('The trajectory has ' + str(frame_count) + ' frames so we can not start at frame ' + str(start))
216 # Get further chunks
217 for i, chunk in enumerate(trajectory, 1): # Start the count at 1
218 frame = start + i
219 if frame in skip:
220 continue
221 if frame == end:
222 break
223 if i % step == 0:
224 reduced_trajectory = mdt.join([reduced_trajectory, chunk], check_topology=False)
226 # WARNING: This print here is not just a log. DO NOT REMOVE IT
227 # WARNING: It fixes an error (ValueError: unitcell angle < 0) which happens sometimes
228 # WARNING: It fixes an error (ValueError: Only rectilinear boxes can be saved to mdcrd files) which happens sometimes
229 # DANI: Esto es azaroso: a veces funciona y a veces no
230 #print(reduced_trajectory)
232 # This is necessary sometimes to avoid the following error:
233 # ValueError: Only rectilinear boxes can be saved to mdcrd files
234 # DANI: Esto suele funcionar, aunque a veces mata el proceso
235 # e.g. ValueError: unitcell_lengths must be shape (5, 3). You supplied (1, 3)
236 # Este ejemplo fue observado con las trayectorias raw de 'NAFlex_1d11'
237 # mdtb subset -is structure.stripped.pdb -it structure.stripped.mdcrd -ot test.mdcrd -start 2 -end 17 -step 3
238 # DANI: Va bien que esto esté desactivado, así cuando falle podré compartir la trayectoria que da problemas a los de mdtraj:
239 # https://github.com/mdtraj/mdtraj/issues/1730
240 #reduced_trajectory.unitcell_lengths = [0,0,0]
241 #reduced_trajectory.unitcell_angles = [90,90,90]
243 # Write reduced trajectory to output file
244 reduced_trajectory.save(output_trajectory_filename)
245get_trajectory_subset.format_sets = [
246 {
247 'inputs': {
248 'input_structure_filename': mdtraj_supported_structure_formats,
249 'input_trajectory_filename': mdtraj_supported_trajectory_formats
250 },
251 'outputs': {
252 'output_trajectory_filename': mdtraj_supported_trajectory_formats
253 }
254 },
255 {
256 'inputs': {
257 'input_structure_filename': None,
258 'input_trajectory_filename': mdtraj_supported_structure_formats
259 },
260 'outputs': {
261 'output_trajectory_filename': mdtraj_supported_structure_formats
262 }
263 }
264]
266# Get first frame from a trajectory
267def get_first_frame (
268 input_structure_filename : str,
269 input_trajectory_filename : str,
270 output_frame_filename : str,
271):
272 get_trajectory_subset(input_structure_filename, input_trajectory_filename, output_frame_filename, 0)
273# Set function supported formats
274single_frame_getter_format_sets = [
275 {
276 'inputs': {
277 'input_structure_filename': None,
278 'input_trajectory_filename': mdtraj_supported_trajectory_formats
279 },
280 'outputs': {
281 'output_frame_filename': mdtraj_supported_trajectory_formats
282 }
283 },
284 {
285 'inputs': {
286 'input_structure_filename': mdtraj_supported_structure_formats,
287 'input_trajectory_filename': mdtraj_supported_trajectory_formats
288 },
289 'outputs': {
290 'output_frame_filename': { *mdtraj_supported_structure_formats, *mdtraj_supported_trajectory_formats }
291 }
292 },
293]
294get_first_frame.format_sets = single_frame_getter_format_sets
296# Split a trajectory which is actually a merge of independent trajectories back to the original pieces
297# Run an RMSD analysis to guess where the pieces are
298# The cutoff sets the minimum RMSD change to consider it is a different trajectory
299def split_merged_trajectories (
300 input_structure_filename : str,
301 input_trajectory_filename : str,
302 sudden_jump_cutoff : float = 0.2,
303 output_trajectory_prefix : str = 'split'):
304 # Get the input trajectory format
305 input_trajectory_file = File(input_trajectory_filename)
306 input_trajectory_format = input_trajectory_file.format
307 # The cutoff must be a negative number since independent trajectories RMSD sudden jumps will be negative
308 cutoff = -abs(sudden_jump_cutoff)
309 # Load the trajectory
310 trajectory = mdt.load(input_trajectory_filename, top=input_structure_filename)
311 # Run a RMSD analysis
312 rmsd_data = mdt.rmsd(trajectory, trajectory, 0)
313 # Find sudden jumps in RMSD values
314 sudden_jumps = []
315 previous_value = 0
316 for i, value in enumerate(rmsd_data):
317 diff = value - previous_value
318 # In case this is a new trajectory the jump will be negative
319 if diff < cutoff:
320 print('New trajectory at frame ' + str(i))
321 sudden_jumps.append(i)
322 previous_value = value
323 # In there was no jumps then stop here
324 if len(sudden_jumps) == 0:
325 print('Apparently there is a single trajectory')
326 return
327 # Generate a trajectory subset for each cut
328 cut_points = [ 0, *sudden_jumps, len(rmsd_data) ]
329 for i in range(len(cut_points) -1):
330 start = cut_points[i]
331 end = cut_points[i+1]
332 trajectory_split = trajectory[start:end]
333 split_filename = output_trajectory_prefix + '_' + str(i+1) + '.' + input_trajectory_format
334 print('Writting from frame ' + str(start) + ' to frame ' + str(end) + ' to "' + split_filename + '"')
335 trajectory_split.save(split_filename)
337# Sort atoms in a trajectory file by sorting its coordinates
338# A new atom indices list is expected in order to do the sorting
339def sort_trajectory_atoms (
340 input_structure_file : 'File',
341 input_trajectory_file : 'File',
342 output_trajectory_file : 'File',
343 new_atom_indices : List[int]
344):
346 # Load the topology, which is used further
347 topology = mdt.load_topology(input_structure_file.path)
349 # Check the new atom indices to match the number of atoms in the topology
350 if len(new_atom_indices) != topology.n_atoms:
351 raise ValueError('The number of atom indices (' + str(len(new_atom_indices)) + ') does not match the number of atoms in topology(' + str(topology.n_atoms) + ')' )
353 # If the input and the output trajectory filenames are equal then we must rename the input filename to not overwrite it
354 if input_trajectory_file.path == output_trajectory_file.path:
355 backup_file = input_trajectory_file.get_prefixed_file('backup.')
356 os.rename(input_trajectory_file.path, backup_file.path)
357 input_trajectory_file = backup_file
359 # If the output trajectory file already exists at this point then we must stop here
360 # The raw trjcat implementation will not join things to the end of it
361 if output_trajectory_file.exists:
362 raise SystemExit('The output file already exists and overwrite is not supported for this functionality')
364 # Load the trajectory frame by frame
365 trajectory = mdt.iterload(input_trajectory_file.path, top=input_structure_file.path, chunk=1)
366 frame_filename = '.frame.xtc'
368 # Print an empty line for the first 'ERASE_PREVIOUS_LINE' to not delete a previous log
369 print()
371 for f, frame in enumerate(trajectory):
372 # Update the current frame log
373 print(ERASE_PREVIOUS_LINE)
374 print('Frame ' + str(f))
376 # Sort coordinates according to new atom indices
377 xyz = frame.xyz[0]
378 new_xyz = np.empty(shape=xyz.shape)
379 for i, atom_index in enumerate(new_atom_indices):
380 new_xyz[i] = xyz[atom_index]
382 # Export the current frame
383 new_frame = mdt.Trajectory([new_xyz],
384 topology=topology,
385 time=frame.time, # This is necessary for the new mdtraj trajectory to be functional
386 unitcell_lengths=frame.unitcell_lengths, # This is necessary for the new mdtraj trajectory to be functional
387 unitcell_angles=frame.unitcell_angles # This is necessary for the new mdtraj trajectory to be functional
388 )
389 new_frame.save(frame_filename)
390 # Join current frame to the output trajectory
391 merge_xtc_files(output_trajectory_file.path, frame_filename)
393 # Remove the residual file
394 # WARNING: It may not exist if the trajectory has 1 frame
395 if exists(frame_filename):
396 os.remove(frame_filename)