Coverage for model_workflow/tools/process_input_files.py: 82%
157 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
1from os import rename
3from model_workflow.utils.auxiliar import InputError, MISSING_TOPOLOGY, warn
4from model_workflow.utils.constants import STRUCTURE_FILENAME, TRAJECTORY_FILENAME
5from model_workflow.utils.constants import CONVERTED_STRUCTURE, CONVERTED_TRAJECTORY
6from model_workflow.utils.constants import FILTERED, FILTERED_STRUCTURE, FILTERED_TRAJECTORY
7from model_workflow.utils.constants import IMAGED, IMAGED_STRUCTURE, IMAGED_TRAJECTORY
8from model_workflow.utils.constants import CORRECTED_STRUCTURE, CORRECTED_TRAJECTORY
9from model_workflow.utils.constants import INCOMPLETE_PREFIX, CG_ATOM_ELEMENT, SNAPSHOTS_FLAG
10from model_workflow.utils.file import File
11from model_workflow.utils.structures import Structure
12from model_workflow.utils.pyt_spells import get_frames_count
13from model_workflow.utils.arg_cksum import get_cksum_id
14from model_workflow.utils.type_hints import *
16from model_workflow.tools.check_inputs import check_inputs, PREFILTERED_TOPOLOGY_EXCEPTION
17from model_workflow.tools.conversions import convert
18from model_workflow.tools.filter_atoms import filter_atoms
19from model_workflow.tools.image_and_fit import image_and_fit
20from model_workflow.tools.get_charges import get_charges
21from model_workflow.tools.structure_corrector import structure_corrector
24def process_input_files (
25 input_structure_file : 'File',
26 input_trajectory_files : 'File',
27 input_topology_file : 'File',
28 topology_filepath : str,
29 # Processing parameters
30 filter_selection : str,
31 image : bool,
32 fit : bool,
33 translation : Tuple,
34 # Make sure the MD is used only to set values or use its functions, but not to get values
35 # Values msut be passed separatedly as inputs so the taks can identify when inputs change
36 self : 'MD',
37 # Get the task which is calling this function
38 # Thus we may knwo which inputs have changed compared to previous runs
39 task : 'Task',
40):
41 """Process input files to generate the processed files.
42 This process corrects and standarizes the topology, the trajectory and the structure."""
44 # Make sure we do not enter in a loop
45 # This may happen when we read/call an output value/file by mistake
46 if hasattr(self, '_processed'): raise RuntimeError('Looped processing')
47 self._processed = True
49 # Input trajectories should have all the same format
50 input_trajectory_formats = set([ trajectory_file.format for trajectory_file in input_trajectory_files ])
51 if len(input_trajectory_formats) > 1:
52 raise InputError('All input trajectory files must have the same format')
54 # Set the output filepaths
55 output_structure_filepath = self.pathify(STRUCTURE_FILENAME)
56 output_structure_file = File(output_structure_filepath)
57 output_trajectory_filepath = self.pathify(TRAJECTORY_FILENAME)
58 output_trajectory_file = File(output_trajectory_filepath)
59 output_topology_filepath = topology_filepath
60 output_topology_file = File(output_topology_filepath) if output_topology_filepath else MISSING_TOPOLOGY
62 # --- TOPOLOGY FORMAT ASSUMTION ---------------------------------------------------------
64 # Make a super fast check and an assumption
65 # Topologies with the .top extension for us are considered Gromacs topology format
66 # However it is usual than Amber topologies (ideally .prmtop) are also '.top'
67 # So if the trajectory is Amber and the topology is .top then assume it is Amber
68 input_trajectories_format = list(input_trajectory_formats)[0]
69 if input_topology_file != MISSING_TOPOLOGY and \
70 input_topology_file.extension == 'top' and input_trajectories_format == 'nc':
71 # Creating a topology symlink/copy with the correct extension
72 warn(f'Topology is .top but the trajectory is from Amber. I assume the topology is .prmtop')
73 reformatted_topology_file = input_topology_file.reformat('prmtop')
74 if input_structure_file == input_topology_file:
75 input_structure_file = reformatted_topology_file
76 input_topology_file = reformatted_topology_file
78 # --- FIRST CHECK -----------------------------------------------------------------------
80 # Check input files match in number of atoms
81 # Here we have not standarized the format so we must check differently with every format
82 exceptions = check_inputs(input_structure_file, input_trajectory_files, input_topology_file)
84 # There is a chance theat the inputs checker has prefiltered the topology to match trajectory
85 # If this is the case then use the prefiltered topology from now on
86 prefiltered_topology = exceptions.get(PREFILTERED_TOPOLOGY_EXCEPTION, None)
87 if prefiltered_topology:
88 if input_structure_file == input_topology_file:
89 input_structure_file = prefiltered_topology
90 input_topology_file = prefiltered_topology
92 # --- CONVERTING AND MERGING ------------------------------------------------------------
94 # Set the output format for the already converted structure
95 input_structure_format = input_structure_file.format
96 output_structure_format = output_structure_file.format
97 converted_structure_filepath = self.pathify(CONVERTED_STRUCTURE)
98 # If input structure already matches the output format then avoid the renaming
99 if input_structure_format == output_structure_format:
100 converted_structure_filepath = input_structure_file.path
101 # Set the output file for the already converted structure
102 converted_structure_file = File(converted_structure_filepath)
103 # Set the output format for the already converted trajectory
104 input_trajectories_format = list(input_trajectory_formats)[0]
105 output_trajectory_format = output_trajectory_file.format
106 # Set the output file for the already converted trajectory
107 converted_trajectory_filepath = self.pathify(CONVERTED_TRAJECTORY)
108 # If input trajectory already matches the output format and is unique then avoid the renaming
109 if input_trajectories_format == output_trajectory_format and len(input_trajectory_files) == 1:
110 converted_trajectory_filepath = input_trajectory_files[0].path
111 converted_trajectory_file = File(converted_trajectory_filepath)
112 # Join all input trajectory paths
113 input_trajectory_paths = [ trajectory_file.path for trajectory_file in input_trajectory_files ]
115 # Set an intermeidate file for the trajectory while it is being converted
116 # This prevents using an incomplete trajectory in case the workflow is suddenly interrupted while converting
117 incompleted_converted_trajectory_filepath = self.pathify(INCOMPLETE_PREFIX + CONVERTED_TRAJECTORY)
118 incompleted_converted_trajectory_file = File(incompleted_converted_trajectory_filepath)
119 # If there is an incomplete trajectory then remove it
120 if incompleted_converted_trajectory_file.exists:
121 incompleted_converted_trajectory_file.remove()
123 # Convert input structure and trajectories to output structure and trajectory
124 if not converted_structure_file.exists or not converted_trajectory_file.exists:
125 print(' * Converting and merging')
126 convert(
127 input_structure_filepath = input_structure_file.path,
128 output_structure_filepath = converted_structure_file.path,
129 input_trajectory_filepaths = input_trajectory_paths,
130 output_trajectory_filepath = incompleted_converted_trajectory_file.path,
131 )
132 # Once converted, rename the trajectory file as completed
133 rename(incompleted_converted_trajectory_file.path, converted_trajectory_file.path)
135 # Topologies are never converted, but they are kept in their original format
137 # --- provisional reference structure ---
139 # Now that we MUST have a PDB file we can set a provisional structure instance
140 # Note that this structure is not yet corrected so it must be used with care
141 # Otherwise we could have silent errors
142 provisional_structure = Structure.from_pdb_file(converted_structure_file.path)
143 # Now we can set a provisional coarse grain selection
144 # This selection is useful to avoid problems with CG atom elements
145 # Since this is proviosonal we will make it silent
146 provisional_cg_selection = self._set_cg_selection(provisional_structure, verbose=False)
147 for atom_index in provisional_cg_selection.atom_indices:
148 provisional_structure.atoms[atom_index].element = CG_ATOM_ELEMENT
150 # --- FILTERING ATOMS ------------------------------------------------------------
152 # Find out if we need to filter
153 # i.e. check if there is a selection filter and it matches some atoms
154 must_filter = bool(filter_selection)
156 # Set output filenames for the already filtered structure and trajectory
157 # Note that this is the only step affecting topology and thus here we output the definitive topology
158 filtered_structure_file = File(self.pathify(FILTERED_STRUCTURE)) if must_filter else converted_structure_file
159 filtered_trajectory_file = File(self.pathify(FILTERED_TRAJECTORY)) if must_filter else converted_trajectory_file
160 filtered_topology_file = output_topology_file if must_filter else input_topology_file
162 # Set an intermeidate file for the trajectory while it is being filtered
163 # This prevents using an incomplete trajectory in case the workflow is suddenly interrupted while filtering
164 incompleted_filtered_trajectory_filepath = self.pathify(INCOMPLETE_PREFIX + FILTERED_TRAJECTORY)
165 incompleted_filtered_trajectory_file = File(incompleted_filtered_trajectory_filepath)
166 # If there is an incomplete trajectory then remove it
167 if incompleted_filtered_trajectory_file.exists:
168 incompleted_filtered_trajectory_file.remove()
170 # Check if any output file is missing
171 missing_filter_output = not filtered_structure_file.exists or not filtered_trajectory_file.exists
173 # Check if parameters have changed
174 # Note that for this specific step only filtering is important
175 previous_filtered_parameters = self.cache.retrieve(FILTERED)
176 same_filtered_parameters = previous_filtered_parameters == filter_selection
178 # Filter atoms in structure, trajectory and topology if required and not done yet
179 if must_filter and (missing_filter_output or not same_filtered_parameters):
180 print(' * Filtering atoms')
181 filter_atoms(
182 input_structure_file = converted_structure_file,
183 input_trajectory_file = converted_trajectory_file,
184 input_topology_file = input_topology_file, # We use input topology
185 output_structure_file = filtered_structure_file,
186 output_trajectory_file = incompleted_filtered_trajectory_file,
187 output_topology_file = filtered_topology_file, # We genereate the definitive topology
188 reference_structure = provisional_structure,
189 filter_selection = filter_selection,
190 )
191 # Once filetered, rename the trajectory file as completed
192 rename(incompleted_filtered_trajectory_file.path, filtered_trajectory_file.path)
193 # Update the cache
194 self.cache.update(FILTERED, filter_selection)
196 # --- provisional reference structure ---
198 # Now that we have a filtered PDB file we have to update provisional structure instance
199 # Note that this structure is not yet corrected so it must be used with care
200 # Otherwise we could have silent errors
201 provisional_structure = Structure.from_pdb_file(filtered_structure_file.path)
202 # Again, set the coarse grain atoms
203 # Since elements may be needed to guess PBC selection we must solve them right before
204 # Since this is proviosonal we will make it silent
205 provisional_cg_selection = self._set_cg_selection(provisional_structure, verbose=False)
206 for atom_index in provisional_cg_selection.atom_indices:
207 provisional_structure.atoms[atom_index].element = CG_ATOM_ELEMENT
208 # Also we can set a provisional PBC selection
209 # This selection is useful both for imaging/fitting and for the correction
210 # We will make sure that the provisonal and the final PBC selections match
211 # Since this is proviosonal we will make it silent
212 provisional_pbc_selection = self._set_pbc_selection(provisional_structure, verbose=False)
214 # --- IMAGING AND FITTING ------------------------------------------------------------
216 # There is no logical way to know if the trajectory is already imaged or it must be imaged
217 # We rely exclusively in input flags
218 must_image = image or fit
220 # Set output filenames for the already filtered structure and trajectory
221 imaged_structure_file = File(self.pathify(IMAGED_STRUCTURE)) if must_image else filtered_structure_file
222 imaged_trajectory_file = File(self.pathify(IMAGED_TRAJECTORY)) if must_image else filtered_trajectory_file
224 # Set an intermeidate file for the trajectory while it is being imaged
225 # This prevents using an incomplete trajectory in case the workflow is suddenly interrupted while imaging
226 incompleted_imaged_trajectory_filepath = self.pathify(INCOMPLETE_PREFIX + IMAGED_TRAJECTORY)
227 incompleted_imaged_trajectory_file = File(incompleted_imaged_trajectory_filepath)
228 # If there is an incomplete trajectory then remove it
229 if incompleted_imaged_trajectory_file.exists:
230 incompleted_imaged_trajectory_file.remove()
232 # Check if any output file is missing
233 missing_imaged_output = not imaged_structure_file.exists or not imaged_trajectory_file.exists
235 # Check if parameters have changed
236 # Note that for this step the filter parameters is also important
237 previous_imaged_parameters = self.cache.retrieve(IMAGED)
238 same_imaged_parameters = previous_imaged_parameters == (image, fit, *translation)
240 # Image the trajectory if it is required
241 # i.e. make the trajectory uniform avoiding atom jumps and making molecules to stay whole
242 # Fit the trajectory by removing the translation and rotation if it is required
243 if must_image and (missing_imaged_output or not same_imaged_parameters):
244 print(' * Imaging and fitting')
245 image_and_fit(
246 input_structure_file = filtered_structure_file,
247 input_trajectory_file = filtered_trajectory_file,
248 input_topology_file = filtered_topology_file, # This is optional if there are no PBC residues
249 output_structure_file = imaged_structure_file,
250 output_trajectory_file = incompleted_imaged_trajectory_file,
251 image = image,
252 fit = fit,
253 translation = translation,
254 structure = provisional_structure,
255 pbc_selection = provisional_pbc_selection
256 )
257 # Once imaged, rename the trajectory file as completed
258 rename(incompleted_imaged_trajectory_file.path, imaged_trajectory_file.path)
259 # Update the cache
260 self.cache.update(IMAGED, (image, fit, *translation))
261 # Update the provisional strucutre coordinates
262 imaged_structure = Structure.from_pdb_file(imaged_structure_file.path)
263 imaged_structure_coords = [ atom.coords for atom in imaged_structure.atoms ]
264 provisional_structure.set_new_coordinates(imaged_structure_coords)
266 # --- CORRECTING STRUCTURE ------------------------------------------------------------
268 # Note that this step, although it is foucsed in the structure, requires also the trajectory
269 # Also the trajectory may be altered in very rare cases where coordinates must be resorted
271 # There is no possible reason to not correct the structure
272 # This is the last step so the output files will be named as the output files of the whole processing
274 # WARNING:
275 # For the correcting function we need the number of snapshots and at this point it should not be defined
276 # Snapshots are calculated by default from the already processed structure and trajectory
277 # For this reason we can not rely on the public snapshots getter
278 # We must calculate snapshots here using last step structure and trajectory
279 snapshots = None
280 # If we already have a value in the cache then use it, unless the input trajectory has changed
281 same_trajectory = 'input_trajectory_files' not in task.changed_inputs
282 if same_trajectory: snapshots = self.cache.retrieve(SNAPSHOTS_FLAG)
283 # Calculate the new value
284 if snapshots == None:
285 snapshots = get_frames_count(imaged_structure_file, imaged_trajectory_file)
286 # Update the MD snapshots value
287 self.get_snapshots._set_parent_output(self, snapshots)
288 # Save the snapshots value in the cache as well
289 self.cache.update(SNAPSHOTS_FLAG, snapshots)
291 # WARNING:
292 # We may need to resort atoms in the structure corrector function
293 # In such case, bonds and charges must be resorted as well and saved apart to keep values coherent
294 # Bonds are calculated during the structure corrector but atom charges must be extracted no
295 charges = get_charges(filtered_topology_file)
296 self.project.get_charges._set_parent_output(self.project, charges)
298 print(' * Correcting structure')
300 # Set output filenames for the already filtered structure and trajectory
301 corrected_structure_file = File(self.pathify(CORRECTED_STRUCTURE))
302 corrected_trajectory_file = File(self.pathify(CORRECTED_TRAJECTORY))
304 # Correct the structure
305 # This function reads and or modifies the following MD variables:
306 # snapshots, reference_bonds, register, cache, mercy, trust
307 structure_corrector(
308 structure = provisional_structure,
309 input_trajectory_file = imaged_trajectory_file,
310 input_topology_file = filtered_topology_file,
311 output_structure_file = corrected_structure_file,
312 output_trajectory_file = corrected_trajectory_file,
313 MD = self,
314 pbc_selection = provisional_pbc_selection,
315 snapshots = snapshots,
316 register = self.register,
317 mercy = self.project.mercy,
318 trust = self.project.trust
319 )
321 # If the corrected output exists then use it
322 # Otherwise use the previous step files
323 # Corrected files are generated only when changes are made in these files
324 corrected_structure_file = corrected_structure_file if corrected_structure_file.exists else imaged_structure_file
325 corrected_trajectory_file = corrected_trajectory_file if corrected_trajectory_file.exists else imaged_trajectory_file
327 # Set for every type of file (structure, trajectory and topology) the input, the last processed step and the output files
328 input_and_output_files = [
329 (input_structure_file, corrected_structure_file, output_structure_file),
330 (input_trajectory_files[0], corrected_trajectory_file, output_trajectory_file),
331 (input_topology_file, filtered_topology_file, output_topology_file)
332 ]
333 # Set a list of intermediate files
334 intermediate_files = set([
335 converted_structure_file, converted_trajectory_file,
336 filtered_structure_file, filtered_trajectory_file,
337 imaged_structure_file, imaged_trajectory_file,
338 ])
339 # Now we must rename files to match the output file
340 # Processed files remain with some intermediate filename
341 for input_file, processed_file, output_file in input_and_output_files:
342 # If the processed file is already the output file then there is nothing to do here
343 # This means it was already the input file and no changes were made
344 if processed_file == output_file:
345 continue
346 # There is a chance that the input files have not been modified
347 # This means the input format has already the output format and it is not to be imaged, fitted or corrected
348 # However we need the output files to exist and we dont want to rename the original ones to conserve them
349 # In order to not duplicate data, we will setup a symbolic link to the input files with the output filepaths
350 if processed_file == input_file:
351 # If output file exists and its the same as the input file, we can not create a symlink from a file to the same file
352 if output_file.exists:
353 output_file.remove()
354 output_file.set_symlink_to(input_file)
355 # Otherwise rename the last intermediate file as the output file
356 else:
357 # In case the processed file is a symlink we must make sure the symlink is not made to a intermediate step
358 # Intermediate steps will be removed further and thus the symlink would break
359 # If the symlinks points to the input file there is no problem though
360 if processed_file.is_symlink():
361 target_file = processed_file.get_symlink()
362 if target_file in intermediate_files:
363 target_file.rename_to(output_file)
364 else:
365 processed_file.rename_to(output_file)
366 # If the files is not a symlink then simply rename it
367 else:
368 processed_file.rename_to(output_file)
370 # Save the internal variables
371 self._structure_file = output_structure_file
372 self._trajectory_file = output_trajectory_file
373 self.project._topology_file = output_topology_file
375 # If the input and outpu file have the same name then overwrite input cksums
376 # Thus we avoid this task to run forever
377 # DANI: Esto es provisional, hay que prohibir que los inputs se llamen como los outputs
378 if input_structure_file == output_structure_file:
379 task.cache_cksums['input_structure_file'] = output_structure_file.get_cksum()
380 if len(input_trajectory_files) == 1 and input_trajectory_files[0] == output_trajectory_file:
381 task.cache_cksums['input_trajectory_files'] = get_cksum_id([ output_trajectory_file ])
382 if input_topology_file == output_topology_file:
383 task.cache_cksums['input_topology_file'] = str(MISSING_TOPOLOGY) if output_topology_file == MISSING_TOPOLOGY else output_topology_file.get_cksum()
385 # --- Definitive PBC selection ---
387 # Now that we have the corrected structure we can set the definitive PBC atoms
388 # Make sure the selection is identical to the provisional selection
389 if self.pbc_selection != provisional_pbc_selection:
390 raise InputError('PBC selection is not consistent after correcting the structure. '
391 'Please consider using a different PBC selection. '
392 'Avoid relying in atom distances or elements to avoid this problem.')
394 # --- RUNNING FINAL TESTS ------------------------------------------------------------
396 # Note that some tests have been run already
397 # e.g. stable bonds is run in the structure corrector function
399 # Note that tests here do not modify any file
401 # Check the trajectory has not sudden jumps
402 self.is_trajectory_integral()
404 # Make a final test summary
405 self.print_tests_summary()
407 # Issue some warnings if failed or never run tests are skipped
408 self._issue_required_test_warnings
410 # --- Cleanup intermediate files
412 # Set a list of input files to NOT be removed
413 inputs_files = set([ input_structure_file, *input_trajectory_files, input_topology_file ])
414 # We must make sure an intermediate file is not actually an input file before deleting it
415 removable_files = intermediate_files - inputs_files
416 # Now delete every removable file
417 for removable_file in removable_files:
418 # Note that a broken symlink does not 'exists'
419 if removable_file.exists or removable_file.is_symlink():
420 removable_file.remove()