Coverage for mddb_workflow / tools / process_input_files.py: 69%
185 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 18:45 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 18:45 +0000
1from mddb_workflow.utils.auxiliar import InputError, MISSING_TOPOLOGY, warn
2from mddb_workflow.utils.constants import CONVERTED_STRUCTURE, CONVERTED_TRAJECTORY
3from mddb_workflow.utils.constants import FILTERED, FILTERED_STRUCTURE, FILTERED_TRAJECTORY
4from mddb_workflow.utils.constants import IMAGED, IMAGED_STRUCTURE, IMAGED_TRAJECTORY
5from mddb_workflow.utils.constants import CORRECTED_STRUCTURE, CORRECTED_TRAJECTORY
6from mddb_workflow.utils.constants import INCOMPLETE_PREFIX, CG_ATOM_ELEMENT, SNAPSHOTS_FLAG
7from mddb_workflow.utils.constants import FAITH_BYPASS
8from mddb_workflow.utils.file import File
9from mddb_workflow.utils.structures import Structure
10from mddb_workflow.utils.pyt_spells import get_frames_count
11from mddb_workflow.utils.arg_cksum import get_cksum_id
12from mddb_workflow.utils.type_hints import *
14from mddb_workflow.tools.check_inputs import check_inputs, PREFILTERED_TOPOLOGY_EXCEPTION
15from mddb_workflow.tools.conversions import convert
16from mddb_workflow.tools.filter_atoms import filter_atoms
17from mddb_workflow.tools.image_and_fit import image_and_fit
18from mddb_workflow.tools.get_charges import get_charges
19from mddb_workflow.tools.structure_corrector import structure_corrector
22def is_amber_top (input_topology_file : 'File') -> bool:
23 """Check if a .top file is from Amber.
24 Returns True if it is Amber, False if it is Gromacs.
25 """
26 if input_topology_file != MISSING_TOPOLOGY and \
27 input_topology_file.extension == 'top':
28 with open(input_topology_file.path, 'r') as f:
29 lines = f.readlines(5)
31 # If all non-empty first words are '%' assume Amber (.prmtop)
32 first_words = {line.split()[0] for line in lines if line.strip()}
33 if '%VERSION' in first_words:
34 return True
36 # If any line starts with ';' or '[' assume Gromacs (.top)
37 first_chars = {word[0] for word in first_words}
38 if any(c in (';', '[') for c in first_chars):
39 return False
41 # Otherwise we cannot decide
42 raise InputError('Unable to infer topology format from first five lines')
43 return False
46def process_input_files (
47 input_structure_file : 'File',
48 input_trajectory_files : 'File',
49 input_topology_file : 'File',
50 # Output
51 output_directory : str,
52 output_topology_file : 'File',
53 output_structure_file : 'File',
54 output_trajectory_file : 'File',
55 # Processing parameters
56 filter_selection : str,
57 image : bool,
58 fit : bool,
59 translation : tuple,
60 # Make sure the MD is used only to set values or use its functions, but not to get values
61 # Values must be passed separately as inputs so the task can identify when inputs change
62 self : 'MD',
63 # Get the task which is calling this function
64 # Thus we may know which inputs have changed compared to previous runs
65 task : 'Task',
66 # The faith argument
67 faith : bool,
68):
69 """Process input files to generate the processed files.
70 This process corrects and standarizes the topology, the trajectory and the structure.
71 """
72 # Make sure we do not enter in a loop
73 # This may happen when we read/call an output value/file by mistake
74 if hasattr(self, '_processed'): raise RuntimeError('Looped processing')
75 self._processed = True
77 # Input trajectories should have all the same format
78 input_trajectory_formats = set([ trajectory_file.format for trajectory_file in input_trajectory_files ])
79 if len(input_trajectory_formats) > 1:
80 raise InputError('All input trajectory files must have the same format')
82 # If the faith argument was passed then set input files as output files and exit
83 if faith:
84 self.register.add_warning(FAITH_BYPASS, 'All processing steps and tests were skipped')
85 if output_structure_file.exists: output_structure_file.remove()
86 output_structure_file.set_symlink_to(input_structure_file)
87 if output_trajectory_file.exists: output_trajectory_file.remove()
88 output_trajectory_file.set_symlink_to(input_trajectory_files[0])
89 if output_topology_file != MISSING_TOPOLOGY:
90 if output_topology_file.exists: output_topology_file.remove()
91 output_topology_file.set_symlink_to(input_topology_file)
92 return
93 self.register.remove_warnings(FAITH_BYPASS)
95 # --- TOPOLOGY FORMAT ASSUMTION ---------------------------------------------------------
97 # Make a super fast check and an assumption
98 # Topologies with the .top extension for us are considered Gromacs topology format
99 # However it is usual than Amber topologies (ideally .prmtop) are also '.top'
100 # So if the trajectory is Amber and the topology is .top then assume it is Amber
101 input_trajectories_format = list(input_trajectory_formats)[0]
102 if is_amber_top(input_topology_file):
103 # Creating a topology symlink/copy with the correct extension
104 warn('Topology is .top but the trajectory is from Amber. It is assumed the topology is .prmtop')
105 reformatted_topology_file = input_topology_file.reformat('prmtop')
106 output_topology_file.path = output_topology_file.extensionless_filepath + '.prmtop'
107 if input_structure_file == input_topology_file:
108 input_structure_file = reformatted_topology_file
109 input_topology_file = reformatted_topology_file
111 # --- FIRST CHECK -----------------------------------------------------------------------
113 # Check input files match in number of atoms
114 # Here we have not standarized the format so we must check differently with every format
115 exceptions = check_inputs(input_structure_file, input_trajectory_files, input_topology_file)
117 # There is a chance that the inputs checker has prefiltered the topology to match trajectory
118 # If this is the case then use the prefiltered topology from now on
119 prefiltered_topology = exceptions.get(PREFILTERED_TOPOLOGY_EXCEPTION, None)
120 if prefiltered_topology:
121 if input_structure_file == input_topology_file:
122 input_structure_file = prefiltered_topology
123 input_topology_file = prefiltered_topology
125 # --- CONVERTING AND MERGING ------------------------------------------------------------
127 # Set the output format for the already converted structure
128 input_structure_format = input_structure_file.format
129 output_structure_format = output_structure_file.format
130 # Set the output file for the already converted structure
131 # If input structure already matches the output format then avoid the renaming
132 if input_structure_format == output_structure_format:
133 converted_structure_file = input_structure_file
134 else:
135 converted_structure_file = File(f'{output_directory}/{CONVERTED_STRUCTURE}')
136 # Set the output format for the already converted trajectory
137 input_trajectories_format = list(input_trajectory_formats)[0]
138 output_trajectory_format = output_trajectory_file.format
139 # Set the output file for the already converted trajectory
140 # If input trajectory already matches the output format and is unique then avoid the renaming
141 if input_trajectories_format == output_trajectory_format and len(input_trajectory_files) == 1:
142 converted_trajectory_file = input_trajectory_files[0]
143 else:
144 converted_trajectory_file = File(f'{output_directory}/{CONVERTED_TRAJECTORY}')
145 # Join all input trajectory paths
146 input_trajectory_paths = [ trajectory_file.path for trajectory_file in input_trajectory_files ]
148 # Set an intermeidate file for the trajectory while it is being converted
149 # This prevents using an incomplete trajectory in case the workflow is suddenly interrupted while converting
150 incompleted_converted_trajectory_filepath = f'{output_directory}/{INCOMPLETE_PREFIX + CONVERTED_TRAJECTORY}'
151 incompleted_converted_trajectory_file = File(incompleted_converted_trajectory_filepath)
152 # If there is an incomplete trajectory then remove it
153 if incompleted_converted_trajectory_file.exists:
154 incompleted_converted_trajectory_file.remove()
156 # Convert input structure and trajectories to output structure and trajectory
157 if not converted_structure_file.exists or not converted_trajectory_file.exists:
158 print(' * Converting and merging')
159 convert(
160 input_structure_filepath = input_structure_file.path,
161 output_structure_filepath = converted_structure_file.path,
162 input_trajectory_filepaths = input_trajectory_paths,
163 output_trajectory_filepath = incompleted_converted_trajectory_file.path,
164 )
165 # Once converted, rename the trajectory file as completed
166 # If the converted trajectory already exists then it means it is the input trajectory
167 if converted_trajectory_file.exists:
168 incompleted_converted_trajectory_file.remove()
169 else:
170 incompleted_converted_trajectory_file.rename_to(converted_trajectory_file)
172 # Topologies are never converted, but they are kept in their original format
174 # --- provisional reference structure ---
176 # Now that we MUST have a PDB file we can set a provisional structure instance
177 # Note that this structure is not yet corrected so it must be used with care
178 # Otherwise we could have silent errors
179 provisional_structure = Structure.from_pdb_file(converted_structure_file.path)
180 # Now we can set a provisional coarse grain selection
181 # This selection is useful to avoid problems with CG atom elements
182 # Since this is proviosonal we will make it silent
183 provisional_cg_selection = self._set_cg_selection(provisional_structure, verbose=False)
184 for atom_index in provisional_cg_selection.atom_indices:
185 provisional_structure.atoms[atom_index].element = CG_ATOM_ELEMENT
187 # --- FILTERING ATOMS ------------------------------------------------------------
189 # Find out if we need to filter
190 # i.e. check if there is a selection filter and it matches some atoms
191 must_filter = bool(filter_selection)
193 # Set output filenames for the already filtered structure and trajectory
194 if must_filter:
195 filtered_structure_file = File(f'{output_directory}/{FILTERED_STRUCTURE}')
196 filtered_trajectory_file = File(f'{output_directory}/{FILTERED_TRAJECTORY}')
197 else:
198 filtered_structure_file = converted_structure_file
199 filtered_trajectory_file = converted_trajectory_file
200 # Note that this is the only step affecting topology and thus here we output the definitive topology
201 filtered_topology_file = output_topology_file if must_filter else input_topology_file
203 # Set an intermeidate file for the trajectory while it is being filtered
204 # This prevents using an incomplete trajectory in case the workflow is suddenly interrupted while filtering
205 incompleted_filtered_trajectory_filepath = f'{output_directory}/{INCOMPLETE_PREFIX + FILTERED_TRAJECTORY}'
206 incompleted_filtered_trajectory_file = File(incompleted_filtered_trajectory_filepath)
207 # If there is an incomplete trajectory then remove it
208 if incompleted_filtered_trajectory_file.exists:
209 incompleted_filtered_trajectory_file.remove()
211 # Check if any output file is missing
212 missing_filter_output = not filtered_structure_file.exists \
213 or not filtered_trajectory_file.exists \
214 or (filtered_topology_file != MISSING_TOPOLOGY and not filtered_topology_file.exists)
216 # Check if parameters have changed
217 # Note that for this specific step only filtering is important
218 previous_filtered_parameters = self.cache.retrieve(FILTERED)
219 same_filtered_parameters = previous_filtered_parameters == filter_selection
221 # Filter atoms in structure, trajectory and topology if required and not done yet
222 if must_filter and (missing_filter_output or not same_filtered_parameters):
223 print(' * Filtering atoms')
224 filter_atoms(
225 input_structure_file = converted_structure_file,
226 input_trajectory_file = converted_trajectory_file,
227 input_topology_file = input_topology_file, # We use input topology
228 output_structure_file = filtered_structure_file,
229 output_trajectory_file = incompleted_filtered_trajectory_file,
230 output_topology_file = filtered_topology_file, # We genereate the definitive topology
231 reference_structure = provisional_structure,
232 filter_selection = filter_selection,
233 )
234 # Once filetered, rename the trajectory file as completed
235 # If the filtered trajectory already exists then it means it is the input trajectory
236 if filtered_trajectory_file.exists:
237 incompleted_filtered_trajectory_file.remove()
238 else:
239 incompleted_filtered_trajectory_file.rename_to(filtered_trajectory_file)
240 self.cache.update(FILTERED, filter_selection)
242 # --- provisional reference structure ---
244 # Now that we have a filtered PDB file we have to update provisional structure instance
245 # Note that this structure is not yet corrected so it must be used with care
246 # Otherwise we could have silent errors
247 provisional_structure = Structure.from_pdb_file(filtered_structure_file.path)
248 # Again, set the coarse grain atoms
249 # Since elements may be needed to guess PBC selection we must solve them right before
250 # Since this is proviosonal we will make it silent
251 provisional_cg_selection = self._set_cg_selection(provisional_structure, verbose=False)
252 for atom_index in provisional_cg_selection.atom_indices:
253 provisional_structure.atoms[atom_index].element = CG_ATOM_ELEMENT
254 # Also we can set a provisional PBC selection
255 # This selection is useful both for imaging/fitting and for the correction
256 # We will make sure that the provisonal and the final PBC selections match
257 # Since this is proviosonal we will make it silent
258 provisional_pbc_selection = self._set_pbc_selection(provisional_structure, verbose=False)
260 # --- IMAGING AND FITTING ------------------------------------------------------------
262 # There is no logical way to know if the trajectory is already imaged or it must be imaged
263 # We rely exclusively in input flags
264 must_image = image or fit
266 # Set output filenames for the already filtered structure and trajectory
267 if must_image:
268 imaged_structure_file = File(f'{output_directory}/{IMAGED_STRUCTURE}')
269 imaged_trajectory_file = File(f'{output_directory}/{IMAGED_TRAJECTORY}')
270 else:
271 imaged_structure_file = filtered_structure_file
272 imaged_trajectory_file = filtered_trajectory_file
274 # Set an intermeidate file for the trajectory while it is being imaged
275 # This prevents using an incomplete trajectory in case the workflow is suddenly interrupted while imaging
276 incompleted_imaged_trajectory_filepath = f'{output_directory}/{INCOMPLETE_PREFIX + IMAGED_TRAJECTORY}'
277 incompleted_imaged_trajectory_file = File(incompleted_imaged_trajectory_filepath)
278 # If there is an incomplete trajectory then remove it
279 if incompleted_imaged_trajectory_file.exists:
280 incompleted_imaged_trajectory_file.remove()
282 # Check if any output file is missing
283 missing_imaged_output = not imaged_structure_file.exists or not imaged_trajectory_file.exists
285 # Check if parameters have changed
286 # Note that for this step the filter parameters is also important
287 previous_imaged_parameters = self.cache.retrieve(IMAGED)
288 same_imaged_parameters = previous_imaged_parameters == [image, fit, *translation]
290 # Image the trajectory if it is required
291 # i.e. make the trajectory uniform avoiding atom jumps and making molecules to stay whole
292 # Fit the trajectory by removing the translation and rotation if it is required
293 if must_image and (missing_imaged_output or not same_imaged_parameters):
294 print(' * Imaging and fitting')
295 image_and_fit(
296 input_structure_file = filtered_structure_file,
297 input_trajectory_file = filtered_trajectory_file,
298 input_topology_file = filtered_topology_file, # This is optional if there are no PBC residues
299 output_structure_file = imaged_structure_file,
300 output_trajectory_file = incompleted_imaged_trajectory_file,
301 image = image,
302 fit = fit,
303 translation = translation,
304 structure = provisional_structure,
305 pbc_selection = provisional_pbc_selection
306 )
307 # Once imaged, rename the trajectory file as completed
308 # If the imaged trajectory already exists then it means it is the input trajectory
309 if imaged_trajectory_file.exists:
310 incompleted_imaged_trajectory_file.remove()
311 else:
312 incompleted_imaged_trajectory_file.rename_to(imaged_trajectory_file)
313 # Update the cache
314 self.cache.update(IMAGED, [image, fit, *translation])
315 # Update the provisional strucutre coordinates
316 imaged_structure = Structure.from_pdb_file(imaged_structure_file.path)
317 imaged_structure_coords = [ atom.coords for atom in imaged_structure.atoms ]
318 provisional_structure.set_new_coordinates(imaged_structure_coords)
320 # --- CORRECTING STRUCTURE ------------------------------------------------------------
322 # Note that this step, although it is foucsed in the structure, requires also the trajectory
323 # Also the trajectory may be altered in very rare cases where coordinates must be resorted
325 # There is no possible reason to not correct the structure
326 # This is the last step so the output files will be named as the output files of the whole processing
328 # WARNING:
329 # For the correcting function we need the number of snapshots and at this point it should not be defined
330 # Snapshots are calculated by default from the already processed structure and trajectory
331 # For this reason we can not rely on the public snapshots getter
332 # We must calculate snapshots here using last step structure and trajectory
333 snapshots = None
334 # If we already have a value in the cache then use it, unless the input trajectory has changed
335 same_trajectory = 'input_trajectory_files' not in task.changed_inputs
336 if same_trajectory: snapshots = self.cache.retrieve(SNAPSHOTS_FLAG)
337 # Calculate the new value
338 if snapshots is None:
339 snapshots = get_frames_count(imaged_structure_file, imaged_trajectory_file)
340 # Update the MD task snapshots value
341 self.get_snapshots.prefill(self, snapshots, {
342 'structure_file': imaged_structure_file,
343 'trajectory_file': imaged_trajectory_file
344 })
345 # Save the snapshots value in the cache as well
346 self.cache.update(SNAPSHOTS_FLAG, snapshots)
348 # WARNING:
349 # We may need to resort atoms in the structure corrector function
350 # In such case, bonds and charges must be resorted as well and saved apart to keep values coherent
351 # Bonds are calculated during the structure corrector but atom charges must be extracted no
352 charges = get_charges(filtered_topology_file)
353 self.project.get_charges.prefill(self.project, charges, {
354 'topology_file': filtered_topology_file
355 })
357 print(' * Correcting structure')
359 # Set output filenames for the already filtered structure and trajectory
360 corrected_structure_filepath = f'{output_directory}/{CORRECTED_STRUCTURE}'
361 corrected_structure_file = File(corrected_structure_filepath)
362 corrected_trajectory_filepath = f'{output_directory}/{CORRECTED_TRAJECTORY}'
363 corrected_trajectory_file = File(corrected_trajectory_filepath)
365 # Correct the structure
366 # This function reads and or modifies the following MD variables:
367 # snapshots, reference_bonds, register, cache, mercy, trust
368 structure_corrector(
369 structure = provisional_structure,
370 input_trajectory_file = imaged_trajectory_file,
371 input_topology_file = filtered_topology_file,
372 output_structure_file = corrected_structure_file,
373 output_trajectory_file = corrected_trajectory_file,
374 MD = self,
375 pbc_selection = provisional_pbc_selection,
376 snapshots = snapshots,
377 register = self.register,
378 mercy = self.project.mercy,
379 trust = self.project.trust,
380 guess_bonds = self.project.guess_bonds,
381 ignore_bonds = self.project.ignore_bonds,
382 )
384 # If the corrected output exists then use it
385 # Otherwise use the previous step files
386 # Corrected files are generated only when changes are made in these files
387 corrected_structure_file = corrected_structure_file if corrected_structure_file.exists else imaged_structure_file
388 corrected_trajectory_file = corrected_trajectory_file if corrected_trajectory_file.exists else imaged_trajectory_file
390 # Set for every type of file (structure, trajectory and topology) the input, the last processed step and the output files
391 input_and_output_files = [
392 (input_structure_file, corrected_structure_file, output_structure_file),
393 (input_trajectory_files[0], corrected_trajectory_file, output_trajectory_file),
394 (input_topology_file, filtered_topology_file, output_topology_file)
395 ]
396 # Set a list of intermediate files
397 intermediate_files = set([
398 converted_structure_file, converted_trajectory_file,
399 filtered_structure_file, filtered_trajectory_file,
400 imaged_structure_file, imaged_trajectory_file,
401 ])
402 # Now we must rename files to match the output file
403 # Processed files remain with some intermediate filename
404 for input_file, processed_file, output_file in input_and_output_files:
405 # If the processed file is already the output file then there is nothing to do here
406 # This means it was already the input file and no changes were made
407 if processed_file == output_file: continue
408 # There is a chance that the input files have not been modified
409 # This means the input format has already the output format and it is not to be imaged, fitted or corrected
410 # However we need the output files to exist and we dont want to rename the original ones to conserve them
411 # In order to not duplicate data, we will setup a symbolic link to the input files with the output filepaths
412 if processed_file == input_file:
413 # If output file exists and its the same as the input file, we can not create a symlink from a file to the same file
414 if output_file.exists:
415 output_file.remove()
416 output_file.set_symlink_to(input_file)
417 # Otherwise rename the last intermediate file as the output file
418 else:
419 # In case the processed file is a symlink we must make sure the symlink is not made to a intermediate step
420 # Intermediate steps will be removed further and thus the symlink would break
421 # If the symlinks points to the input file there is no problem though
422 if processed_file.is_symlink():
423 target_file = processed_file.get_symlink()
424 if target_file in intermediate_files:
425 target_file.rename_to(output_file)
426 else:
427 processed_file.rename_to(output_file)
428 # If the files is not a symlink then simply rename it
429 else:
430 processed_file.rename_to(output_file)
432 # Save the internal variables
433 self._structure_file = output_structure_file
434 self._trajectory_file = output_trajectory_file
435 self.project._topology_file = output_topology_file
437 # If the input and output file have the same name then overwrite input cksums
438 # Thus we avoid this task to run forever
439 # DANI: Esto es provisional, hay que prohibir que los inputs se llamen como los outputs
440 if input_structure_file == output_structure_file:
441 task.cache_cksums['input_structure_file'] = get_cksum_id(output_structure_file)
442 if len(input_trajectory_files) == 1 and input_trajectory_files[0] == output_trajectory_file:
443 task.cache_cksums['input_trajectory_files'] = get_cksum_id([ output_trajectory_file ])
444 if input_topology_file == output_topology_file:
445 task.cache_cksums['input_topology_file'] = get_cksum_id(output_topology_file)
447 # --- Definitive PBC selection ---
449 # Now that we have the corrected structure we can set the definitive PBC atoms
450 # Make sure the selection is identical to the provisional selection
451 if self.pbc_selection != provisional_pbc_selection:
452 raise InputError('PBC selection is not consistent after correcting the structure. '
453 'Please consider using a different PBC selection. '
454 'Avoid relying in atom distances or elements to avoid this problem.')
456 # --- RUNNING FINAL TESTS ------------------------------------------------------------
458 # Note that tests here do not modify any file
460 # Check the trajectory has not sudden jumps
461 self.is_trajectory_integral()
463 # Make a final test summary
464 self.print_tests_summary()
466 # Issue some warnings if failed or never run tests are skipped
467 self._issue_required_test_warnings()
469 # --- Cleanup intermediate files
471 # Set a list of input files to NOT be removed
472 inputs_files = set([ input_structure_file, *input_trajectory_files, input_topology_file ])
473 # We must make sure an intermediate file is not actually an input file before deleting it
474 removable_files = intermediate_files - inputs_files
475 # Now delete every removable file
476 for removable_file in removable_files:
477 # Note that a broken symlink does not 'exists'
478 if removable_file.exists or removable_file.is_symlink():
479 removable_file.remove()