Coverage for mddb_workflow/tools/process_input_files.py: 78%
192 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 15:48 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 15:48 +0000
1from mddb_workflow.utils.auxiliar import InputError, MISSING_TOPOLOGY, warn
2from mddb_workflow.utils.constants import STRUCTURE_FILENAME, TRAJECTORY_FILENAME
3from mddb_workflow.utils.constants import CONVERTED_STRUCTURE, CONVERTED_TRAJECTORY
4from mddb_workflow.utils.constants import FILTERED, FILTERED_STRUCTURE, FILTERED_TRAJECTORY
5from mddb_workflow.utils.constants import IMAGED, IMAGED_STRUCTURE, IMAGED_TRAJECTORY
6from mddb_workflow.utils.constants import CORRECTED_STRUCTURE, CORRECTED_TRAJECTORY
7from mddb_workflow.utils.constants import INCOMPLETE_PREFIX, CG_ATOM_ELEMENT, SNAPSHOTS_FLAG
8from mddb_workflow.utils.file import File
9from mddb_workflow.utils.structures import Structure
10from mddb_workflow.utils.pyt_spells import get_frames_count
11from mddb_workflow.utils.arg_cksum import get_cksum_id
12from mddb_workflow.utils.type_hints import *
14from mddb_workflow.tools.check_inputs import check_inputs, PREFILTERED_TOPOLOGY_EXCEPTION
15from mddb_workflow.tools.conversions import convert
16from mddb_workflow.tools.filter_atoms import filter_atoms
17from mddb_workflow.tools.image_and_fit import image_and_fit
18from mddb_workflow.tools.get_charges import get_charges
19from mddb_workflow.tools.structure_corrector import structure_corrector
22def is_amber_top (input_topology_file : 'File') -> bool:
23 """ Check if a .top file is from Amber.
24 Returns True if it is Amber, False if it is Gromacs. """
25 if input_topology_file != MISSING_TOPOLOGY and \
26 input_topology_file.extension == 'top':
27 with open(input_topology_file.path, 'r') as f:
28 lines = f.readlines(5)
30 # If all non-empty first words are '%' assume Amber (.prmtop)
31 first_words = {line.split()[0] for line in lines if line.strip()}
32 if '%VERSION' in first_words:
33 return True
35 # If any line starts with ';' or '[' assume Gromacs (.top)
36 first_chars = {word[0] for word in first_words}
37 if any(c in (';', '[') for c in first_chars):
38 return False
40 # Otherwise we cannot decide
41 raise InputError('Unable to infer topology format from first five lines')
43 return False
46def process_input_files (
47 input_structure_file : 'File',
48 input_trajectory_files : 'File',
49 input_topology_file : 'File',
50 # Output
51 output_directory : str,
52 topology_filepath : str,
53 # Processing parameters
54 filter_selection : str,
55 image : bool,
56 fit : bool,
57 translation : tuple,
58 # Make sure the MD is used only to set values or use its functions, but not to get values
59 # Values msut be passed separatedly as inputs so the taks can identify when inputs change
60 self : 'MD',
61 # Get the task which is calling this function
62 # Thus we may know which inputs have changed compared to previous runs
63 task : 'Task',
64):
65 """ Process input files to generate the processed files.
66 This process corrects and standarizes the topology, the trajectory and the structure. """
68 # Make sure we do not enter in a loop
69 # This may happen when we read/call an output value/file by mistake
70 if hasattr(self, '_processed'): raise RuntimeError('Looped processing')
71 self._processed = True
73 # Input trajectories should have all the same format
74 input_trajectory_formats = set([ trajectory_file.format for trajectory_file in input_trajectory_files ])
75 if len(input_trajectory_formats) > 1:
76 raise InputError('All input trajectory files must have the same format')
78 # Set the output filepaths
79 # Note that these outputs aim for the MD directory, not for the task directory
80 output_structure_filepath = self.pathify(STRUCTURE_FILENAME)
81 output_structure_file = File(output_structure_filepath)
82 output_trajectory_filepath = self.pathify(TRAJECTORY_FILENAME)
83 output_trajectory_file = File(output_trajectory_filepath)
84 output_topology_filepath = topology_filepath
85 output_topology_file = File(output_topology_filepath) if output_topology_filepath else MISSING_TOPOLOGY
87 # --- TOPOLOGY FORMAT ASSUMTION ---------------------------------------------------------
89 # Make a super fast check and an assumption
90 # Topologies with the .top extension for us are considered Gromacs topology format
91 # However it is usual than Amber topologies (ideally .prmtop) are also '.top'
92 # So if the trajectory is Amber and the topology is .top then assume it is Amber
93 input_trajectories_format = list(input_trajectory_formats)[0]
94 if is_amber_top(input_topology_file):
95 # Creating a topology symlink/copy with the correct extension
96 warn(f'Topology is .top but the trajectory is from Amber. It is assumed the topology is .prmtop')
97 reformatted_topology_file = input_topology_file.reformat('prmtop')
98 output_topology_file.path = output_topology_file.extensionless_filepath+'.prmtop'
99 if input_structure_file == input_topology_file:
100 input_structure_file = reformatted_topology_file
101 input_topology_file = reformatted_topology_file
103 # --- FIRST CHECK -----------------------------------------------------------------------
105 # Check input files match in number of atoms
106 # Here we have not standarized the format so we must check differently with every format
107 exceptions = check_inputs(input_structure_file, input_trajectory_files, input_topology_file)
109 # There is a chance that the inputs checker has prefiltered the topology to match trajectory
110 # If this is the case then use the prefiltered topology from now on
111 prefiltered_topology = exceptions.get(PREFILTERED_TOPOLOGY_EXCEPTION, None)
112 if prefiltered_topology:
113 if input_structure_file == input_topology_file:
114 input_structure_file = prefiltered_topology
115 input_topology_file = prefiltered_topology
117 # --- CONVERTING AND MERGING ------------------------------------------------------------
119 # Set the output format for the already converted structure
120 input_structure_format = input_structure_file.format
121 output_structure_format = output_structure_file.format
122 converted_structure_filepath = f'{output_directory}/{CONVERTED_STRUCTURE}'
123 # If input structure already matches the output format then avoid the renaming
124 if input_structure_format == output_structure_format:
125 converted_structure_filepath = input_structure_file.path
126 # Set the output file for the already converted structure
127 converted_structure_file = File(converted_structure_filepath)
128 # Set the output format for the already converted trajectory
129 input_trajectories_format = list(input_trajectory_formats)[0]
130 output_trajectory_format = output_trajectory_file.format
131 # Set the output file for the already converted trajectory
132 converted_trajectory_filepath = f'{output_directory}/{CONVERTED_TRAJECTORY}'
133 # If input trajectory already matches the output format and is unique then avoid the renaming
134 if input_trajectories_format == output_trajectory_format and len(input_trajectory_files) == 1:
135 converted_trajectory_filepath = input_trajectory_files[0].path
136 converted_trajectory_file = File(converted_trajectory_filepath)
137 # Join all input trajectory paths
138 input_trajectory_paths = [ trajectory_file.path for trajectory_file in input_trajectory_files ]
140 # Set an intermeidate file for the trajectory while it is being converted
141 # This prevents using an incomplete trajectory in case the workflow is suddenly interrupted while converting
142 incompleted_converted_trajectory_filepath = f'{output_directory}/{INCOMPLETE_PREFIX + CONVERTED_TRAJECTORY}'
143 incompleted_converted_trajectory_file = File(incompleted_converted_trajectory_filepath)
144 # If there is an incomplete trajectory then remove it
145 if incompleted_converted_trajectory_file.exists:
146 incompleted_converted_trajectory_file.remove()
148 # Convert input structure and trajectories to output structure and trajectory
149 if not converted_structure_file.exists or not converted_trajectory_file.exists:
150 print(' * Converting and merging')
151 convert(
152 input_structure_filepath = input_structure_file.path,
153 output_structure_filepath = converted_structure_file.path,
154 input_trajectory_filepaths = input_trajectory_paths,
155 output_trajectory_filepath = incompleted_converted_trajectory_file.path,
156 )
157 # Once converted, rename the trajectory file as completed
158 # If the converted trajectory already exists then it means it is the input trajectory
159 if converted_trajectory_file.exists:
160 incompleted_converted_trajectory_file.remove()
161 else:
162 incompleted_converted_trajectory_file.rename_to(converted_trajectory_file)
164 # Topologies are never converted, but they are kept in their original format
166 # --- provisional reference structure ---
168 # Now that we MUST have a PDB file we can set a provisional structure instance
169 # Note that this structure is not yet corrected so it must be used with care
170 # Otherwise we could have silent errors
171 provisional_structure = Structure.from_pdb_file(converted_structure_file.path)
172 # Now we can set a provisional coarse grain selection
173 # This selection is useful to avoid problems with CG atom elements
174 # Since this is proviosonal we will make it silent
175 provisional_cg_selection = self._set_cg_selection(provisional_structure, verbose=False)
176 for atom_index in provisional_cg_selection.atom_indices:
177 provisional_structure.atoms[atom_index].element = CG_ATOM_ELEMENT
179 # --- FILTERING ATOMS ------------------------------------------------------------
181 # Find out if we need to filter
182 # i.e. check if there is a selection filter and it matches some atoms
183 must_filter = bool(filter_selection)
185 # Set output filenames for the already filtered structure and trajectory
186 # Note that this is the only step affecting topology and thus here we output the definitive topology
187 filtered_structure_filepath = f'{output_directory}/{FILTERED_STRUCTURE}'
188 if not must_filter:
189 filtered_structure_filepath = converted_structure_filepath
190 filtered_structure_file = File(filtered_structure_filepath)
191 filtered_trajectory_filepath = f'{output_directory}/{FILTERED_TRAJECTORY}'
192 if not must_filter:
193 filtered_trajectory_filepath = converted_trajectory_filepath
194 filtered_trajectory_file = File(filtered_trajectory_filepath)
195 filtered_topology_file = output_topology_file if must_filter else input_topology_file
197 # Set an intermeidate file for the trajectory while it is being filtered
198 # This prevents using an incomplete trajectory in case the workflow is suddenly interrupted while filtering
199 incompleted_filtered_trajectory_filepath = f'{output_directory}/{INCOMPLETE_PREFIX + FILTERED_TRAJECTORY}'
200 incompleted_filtered_trajectory_file = File(incompleted_filtered_trajectory_filepath)
201 # If there is an incomplete trajectory then remove it
202 if incompleted_filtered_trajectory_file.exists:
203 incompleted_filtered_trajectory_file.remove()
205 # Check if any output file is missing
206 missing_filter_output = not filtered_structure_file.exists \
207 or not filtered_trajectory_file.exists \
208 or (filtered_topology_file != MISSING_TOPOLOGY and not filtered_topology_file.exists)
210 # Check if parameters have changed
211 # Note that for this specific step only filtering is important
212 previous_filtered_parameters = self.cache.retrieve(FILTERED)
213 same_filtered_parameters = previous_filtered_parameters == filter_selection
215 # Filter atoms in structure, trajectory and topology if required and not done yet
216 if must_filter and (missing_filter_output or not same_filtered_parameters):
217 print(' * Filtering atoms')
218 filter_atoms(
219 input_structure_file = converted_structure_file,
220 input_trajectory_file = converted_trajectory_file,
221 input_topology_file = input_topology_file, # We use input topology
222 output_structure_file = filtered_structure_file,
223 output_trajectory_file = incompleted_filtered_trajectory_file,
224 output_topology_file = filtered_topology_file, # We genereate the definitive topology
225 reference_structure = provisional_structure,
226 filter_selection = filter_selection,
227 )
228 # Once filetered, rename the trajectory file as completed
229 # If the filtered trajectory already exists then it means it is the input trajectory
230 if filtered_trajectory_file.exists:
231 incompleted_filtered_trajectory_file.remove()
232 else:
233 incompleted_filtered_trajectory_file.rename_to(filtered_trajectory_file)
234 self.cache.update(FILTERED, filter_selection)
236 # --- provisional reference structure ---
238 # Now that we have a filtered PDB file we have to update provisional structure instance
239 # Note that this structure is not yet corrected so it must be used with care
240 # Otherwise we could have silent errors
241 provisional_structure = Structure.from_pdb_file(filtered_structure_file.path)
242 # Again, set the coarse grain atoms
243 # Since elements may be needed to guess PBC selection we must solve them right before
244 # Since this is proviosonal we will make it silent
245 provisional_cg_selection = self._set_cg_selection(provisional_structure, verbose=False)
246 for atom_index in provisional_cg_selection.atom_indices:
247 provisional_structure.atoms[atom_index].element = CG_ATOM_ELEMENT
248 # Also we can set a provisional PBC selection
249 # This selection is useful both for imaging/fitting and for the correction
250 # We will make sure that the provisonal and the final PBC selections match
251 # Since this is proviosonal we will make it silent
252 provisional_pbc_selection = self._set_pbc_selection(provisional_structure, verbose=False)
254 # --- IMAGING AND FITTING ------------------------------------------------------------
256 # There is no logical way to know if the trajectory is already imaged or it must be imaged
257 # We rely exclusively in input flags
258 must_image = image or fit
260 # Set output filenames for the already filtered structure and trajectory
261 imaged_structure_filepath = f'{output_directory}/{IMAGED_STRUCTURE}'
262 if not must_image:
263 imaged_structure_filepath = filtered_structure_filepath
264 imaged_structure_file = File(imaged_structure_filepath)
265 imaged_trajectory_filepath = f'{output_directory}/{IMAGED_TRAJECTORY}'
266 if not must_image:
267 imaged_trajectory_filepath = filtered_trajectory_filepath
268 imaged_trajectory_file = File(imaged_trajectory_filepath)
270 # Set an intermeidate file for the trajectory while it is being imaged
271 # This prevents using an incomplete trajectory in case the workflow is suddenly interrupted while imaging
272 incompleted_imaged_trajectory_filepath = f'{output_directory}/{INCOMPLETE_PREFIX + IMAGED_TRAJECTORY}'
273 incompleted_imaged_trajectory_file = File(incompleted_imaged_trajectory_filepath)
274 # If there is an incomplete trajectory then remove it
275 if incompleted_imaged_trajectory_file.exists:
276 incompleted_imaged_trajectory_file.remove()
278 # Check if any output file is missing
279 missing_imaged_output = not imaged_structure_file.exists or not imaged_trajectory_file.exists
281 # Check if parameters have changed
282 # Note that for this step the filter parameters is also important
283 previous_imaged_parameters = self.cache.retrieve(IMAGED)
284 same_imaged_parameters = previous_imaged_parameters == [image, fit, *translation]
286 # Image the trajectory if it is required
287 # i.e. make the trajectory uniform avoiding atom jumps and making molecules to stay whole
288 # Fit the trajectory by removing the translation and rotation if it is required
289 if must_image and (missing_imaged_output or not same_imaged_parameters):
290 print(' * Imaging and fitting')
291 image_and_fit(
292 input_structure_file = filtered_structure_file,
293 input_trajectory_file = filtered_trajectory_file,
294 input_topology_file = filtered_topology_file, # This is optional if there are no PBC residues
295 output_structure_file = imaged_structure_file,
296 output_trajectory_file = incompleted_imaged_trajectory_file,
297 image = image,
298 fit = fit,
299 translation = translation,
300 structure = provisional_structure,
301 pbc_selection = provisional_pbc_selection
302 )
303 # Once imaged, rename the trajectory file as completed
304 # If the imaged trajectory already exists then it means it is the input trajectory
305 if imaged_trajectory_file.exists:
306 incompleted_imaged_trajectory_file.remove()
307 else:
308 incompleted_imaged_trajectory_file.rename_to(imaged_trajectory_file)
309 # Update the cache
310 self.cache.update(IMAGED, [image, fit, *translation])
311 # Update the provisional strucutre coordinates
312 imaged_structure = Structure.from_pdb_file(imaged_structure_file.path)
313 imaged_structure_coords = [ atom.coords for atom in imaged_structure.atoms ]
314 provisional_structure.set_new_coordinates(imaged_structure_coords)
316 # --- CORRECTING STRUCTURE ------------------------------------------------------------
318 # Note that this step, although it is foucsed in the structure, requires also the trajectory
319 # Also the trajectory may be altered in very rare cases where coordinates must be resorted
321 # There is no possible reason to not correct the structure
322 # This is the last step so the output files will be named as the output files of the whole processing
324 # WARNING:
325 # For the correcting function we need the number of snapshots and at this point it should not be defined
326 # Snapshots are calculated by default from the already processed structure and trajectory
327 # For this reason we can not rely on the public snapshots getter
328 # We must calculate snapshots here using last step structure and trajectory
329 snapshots = None
330 # If we already have a value in the cache then use it, unless the input trajectory has changed
331 same_trajectory = 'input_trajectory_files' not in task.changed_inputs
332 if same_trajectory: snapshots = self.cache.retrieve(SNAPSHOTS_FLAG)
333 # Calculate the new value
334 if snapshots == None:
335 snapshots = get_frames_count(imaged_structure_file, imaged_trajectory_file)
336 # Update the MD snapshots value
337 self.get_snapshots._set_parent_output(self, snapshots)
338 # Save the snapshots value in the cache as well
339 self.cache.update(SNAPSHOTS_FLAG, snapshots)
341 # WARNING:
342 # We may need to resort atoms in the structure corrector function
343 # In such case, bonds and charges must be resorted as well and saved apart to keep values coherent
344 # Bonds are calculated during the structure corrector but atom charges must be extracted no
345 charges = get_charges(filtered_topology_file)
346 self.project.get_charges._set_parent_output(self.project, charges)
348 print(' * Correcting structure')
350 # Set output filenames for the already filtered structure and trajectory
351 corrected_structure_filepath = f'{output_directory}/{CORRECTED_STRUCTURE}'
352 corrected_structure_file = File(corrected_structure_filepath)
353 corrected_trajectory_filepath = f'{output_directory}/{CORRECTED_TRAJECTORY}'
354 corrected_trajectory_file = File(corrected_trajectory_filepath)
356 # Correct the structure
357 # This function reads and or modifies the following MD variables:
358 # snapshots, reference_bonds, register, cache, mercy, trust
359 structure_corrector(
360 structure = provisional_structure,
361 input_trajectory_file = imaged_trajectory_file,
362 input_topology_file = filtered_topology_file,
363 output_structure_file = corrected_structure_file,
364 output_trajectory_file = corrected_trajectory_file,
365 MD = self,
366 pbc_selection = provisional_pbc_selection,
367 snapshots = snapshots,
368 register = self.register,
369 mercy = self.project.mercy,
370 trust = self.project.trust,
371 guess_bonds = self.project.guess_bonds
372 )
374 # If the corrected output exists then use it
375 # Otherwise use the previous step files
376 # Corrected files are generated only when changes are made in these files
377 corrected_structure_file = corrected_structure_file if corrected_structure_file.exists else imaged_structure_file
378 corrected_trajectory_file = corrected_trajectory_file if corrected_trajectory_file.exists else imaged_trajectory_file
380 # Set for every type of file (structure, trajectory and topology) the input, the last processed step and the output files
381 input_and_output_files = [
382 (input_structure_file, corrected_structure_file, output_structure_file),
383 (input_trajectory_files[0], corrected_trajectory_file, output_trajectory_file),
384 (input_topology_file, filtered_topology_file, output_topology_file)
385 ]
386 # Set a list of intermediate files
387 intermediate_files = set([
388 converted_structure_file, converted_trajectory_file,
389 filtered_structure_file, filtered_trajectory_file,
390 imaged_structure_file, imaged_trajectory_file,
391 ])
392 # Now we must rename files to match the output file
393 # Processed files remain with some intermediate filename
394 for input_file, processed_file, output_file in input_and_output_files:
395 # If the processed file is already the output file then there is nothing to do here
396 # This means it was already the input file and no changes were made
397 if processed_file == output_file: continue
398 # There is a chance that the input files have not been modified
399 # This means the input format has already the output format and it is not to be imaged, fitted or corrected
400 # However we need the output files to exist and we dont want to rename the original ones to conserve them
401 # In order to not duplicate data, we will setup a symbolic link to the input files with the output filepaths
402 if processed_file == input_file:
403 # If output file exists and its the same as the input file, we can not create a symlink from a file to the same file
404 if output_file.exists:
405 output_file.remove()
406 output_file.set_symlink_to(input_file)
407 # Otherwise rename the last intermediate file as the output file
408 else:
409 # In case the processed file is a symlink we must make sure the symlink is not made to a intermediate step
410 # Intermediate steps will be removed further and thus the symlink would break
411 # If the symlinks points to the input file there is no problem though
412 if processed_file.is_symlink():
413 target_file = processed_file.get_symlink()
414 if target_file in intermediate_files:
415 target_file.rename_to(output_file)
416 else:
417 processed_file.rename_to(output_file)
418 # If the files is not a symlink then simply rename it
419 else:
420 processed_file.rename_to(output_file)
422 # Save the internal variables
423 self._structure_file = output_structure_file
424 self._trajectory_file = output_trajectory_file
425 self.project._topology_file = output_topology_file
427 # If the input and output file have the same name then overwrite input cksums
428 # Thus we avoid this task to run forever
429 # DANI: Esto es provisional, hay que prohibir que los inputs se llamen como los outputs
430 if input_structure_file == output_structure_file:
431 task.cache_cksums['input_structure_file'] = get_cksum_id(output_structure_file)
432 if len(input_trajectory_files) == 1 and input_trajectory_files[0] == output_trajectory_file:
433 task.cache_cksums['input_trajectory_files'] = get_cksum_id([ output_trajectory_file ])
434 if input_topology_file == output_topology_file:
435 task.cache_cksums['input_topology_file'] = get_cksum_id(output_topology_file)
437 # --- Definitive PBC selection ---
439 # Now that we have the corrected structure we can set the definitive PBC atoms
440 # Make sure the selection is identical to the provisional selection
441 if self.pbc_selection != provisional_pbc_selection:
442 raise InputError('PBC selection is not consistent after correcting the structure. '
443 'Please consider using a different PBC selection. '
444 'Avoid relying in atom distances or elements to avoid this problem.')
446 # --- RUNNING FINAL TESTS ------------------------------------------------------------
448 # Note that some tests have been run already
449 # e.g. stable bonds is run in the structure corrector function
451 # Note that tests here do not modify any file
453 # Check the trajectory has not sudden jumps
454 self.is_trajectory_integral()
456 # Make a final test summary
457 self.print_tests_summary()
459 # Issue some warnings if failed or never run tests are skipped
460 self._issue_required_test_warnings()
462 # --- Cleanup intermediate files
464 # Set a list of input files to NOT be removed
465 inputs_files = set([ input_structure_file, *input_trajectory_files, input_topology_file ])
466 # We must make sure an intermediate file is not actually an input file before deleting it
467 removable_files = intermediate_files - inputs_files
468 # Now delete every removable file
469 for removable_file in removable_files:
470 # Note that a broken symlink does not 'exists'
471 if removable_file.exists or removable_file.is_symlink():
472 removable_file.remove()
474 # We must leave something in the directory
475 # Otherwise the task will find out it is empty and will delete it
476 # And we don't want to delete it or it will be done again in the next run
477 # To solve this leave some symbolic file
478 if all(self.register.tests.values()):
479 manifest_filepath = f'{output_directory}/manifest.txt'
480 with open(manifest_filepath, 'w') as file:
481 file.write('Input files were processed sucessfully.\n' +
482 'Intermediate files were removed to save space.\n'
483 'Please do not remove this file\n')