Coverage for model_workflow/tools/process_input_files.py: 82%

157 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 10:54 +0000

1from os import rename 

2 

3from model_workflow.utils.auxiliar import InputError, MISSING_TOPOLOGY, warn 

4from model_workflow.utils.constants import STRUCTURE_FILENAME, TRAJECTORY_FILENAME 

5from model_workflow.utils.constants import CONVERTED_STRUCTURE, CONVERTED_TRAJECTORY 

6from model_workflow.utils.constants import FILTERED, FILTERED_STRUCTURE, FILTERED_TRAJECTORY 

7from model_workflow.utils.constants import IMAGED, IMAGED_STRUCTURE, IMAGED_TRAJECTORY 

8from model_workflow.utils.constants import CORRECTED_STRUCTURE, CORRECTED_TRAJECTORY 

9from model_workflow.utils.constants import INCOMPLETE_PREFIX, CG_ATOM_ELEMENT, SNAPSHOTS_FLAG 

10from model_workflow.utils.file import File 

11from model_workflow.utils.structures import Structure 

12from model_workflow.utils.pyt_spells import get_frames_count 

13from model_workflow.utils.arg_cksum import get_cksum_id 

14from model_workflow.utils.type_hints import * 

15 

16from model_workflow.tools.check_inputs import check_inputs, PREFILTERED_TOPOLOGY_EXCEPTION 

17from model_workflow.tools.conversions import convert 

18from model_workflow.tools.filter_atoms import filter_atoms 

19from model_workflow.tools.image_and_fit import image_and_fit 

20from model_workflow.tools.get_charges import get_charges 

21from model_workflow.tools.structure_corrector import structure_corrector 

22 

23 

24def process_input_files ( 

25 input_structure_file : 'File', 

26 input_trajectory_files : 'File', 

27 input_topology_file : 'File', 

28 topology_filepath : str, 

29 # Processing parameters 

30 filter_selection : str, 

31 image : bool, 

32 fit : bool, 

33 translation : Tuple, 

34 # Make sure the MD is used only to set values or use its functions, but not to get values 

35 # Values msut be passed separatedly as inputs so the taks can identify when inputs change 

36 self : 'MD', 

37 # Get the task which is calling this function 

38 # Thus we may knwo which inputs have changed compared to previous runs 

39 task : 'Task', 

40): 

41 """Process input files to generate the processed files. 

42 This process corrects and standarizes the topology, the trajectory and the structure.""" 

43 

44 # Make sure we do not enter in a loop 

45 # This may happen when we read/call an output value/file by mistake 

46 if hasattr(self, '_processed'): raise RuntimeError('Looped processing') 

47 self._processed = True 

48 

49 # Input trajectories should have all the same format 

50 input_trajectory_formats = set([ trajectory_file.format for trajectory_file in input_trajectory_files ]) 

51 if len(input_trajectory_formats) > 1: 

52 raise InputError('All input trajectory files must have the same format') 

53 

54 # Set the output filepaths 

55 output_structure_filepath = self.pathify(STRUCTURE_FILENAME) 

56 output_structure_file = File(output_structure_filepath) 

57 output_trajectory_filepath = self.pathify(TRAJECTORY_FILENAME) 

58 output_trajectory_file = File(output_trajectory_filepath) 

59 output_topology_filepath = topology_filepath 

60 output_topology_file = File(output_topology_filepath) if output_topology_filepath else MISSING_TOPOLOGY 

61 

62 # --- TOPOLOGY FORMAT ASSUMTION --------------------------------------------------------- 

63 

64 # Make a super fast check and an assumption 

65 # Topologies with the .top extension for us are considered Gromacs topology format 

66 # However it is usual than Amber topologies (ideally .prmtop) are also '.top' 

67 # So if the trajectory is Amber and the topology is .top then assume it is Amber 

68 input_trajectories_format = list(input_trajectory_formats)[0] 

69 if input_topology_file != MISSING_TOPOLOGY and \ 

70 input_topology_file.extension == 'top' and input_trajectories_format == 'nc': 

71 # Creating a topology symlink/copy with the correct extension 

72 warn(f'Topology is .top but the trajectory is from Amber. I assume the topology is .prmtop') 

73 reformatted_topology_file = input_topology_file.reformat('prmtop') 

74 if input_structure_file == input_topology_file: 

75 input_structure_file = reformatted_topology_file 

76 input_topology_file = reformatted_topology_file 

77 

78 # --- FIRST CHECK ----------------------------------------------------------------------- 

79 

80 # Check input files match in number of atoms 

81 # Here we have not standarized the format so we must check differently with every format 

82 exceptions = check_inputs(input_structure_file, input_trajectory_files, input_topology_file) 

83 

84 # There is a chance theat the inputs checker has prefiltered the topology to match trajectory 

85 # If this is the case then use the prefiltered topology from now on 

86 prefiltered_topology = exceptions.get(PREFILTERED_TOPOLOGY_EXCEPTION, None) 

87 if prefiltered_topology: 

88 if input_structure_file == input_topology_file: 

89 input_structure_file = prefiltered_topology 

90 input_topology_file = prefiltered_topology 

91 

92 # --- CONVERTING AND MERGING ------------------------------------------------------------ 

93 

94 # Set the output format for the already converted structure 

95 input_structure_format = input_structure_file.format 

96 output_structure_format = output_structure_file.format 

97 converted_structure_filepath = self.pathify(CONVERTED_STRUCTURE) 

98 # If input structure already matches the output format then avoid the renaming 

99 if input_structure_format == output_structure_format: 

100 converted_structure_filepath = input_structure_file.path 

101 # Set the output file for the already converted structure 

102 converted_structure_file = File(converted_structure_filepath) 

103 # Set the output format for the already converted trajectory 

104 input_trajectories_format = list(input_trajectory_formats)[0] 

105 output_trajectory_format = output_trajectory_file.format 

106 # Set the output file for the already converted trajectory 

107 converted_trajectory_filepath = self.pathify(CONVERTED_TRAJECTORY) 

108 # If input trajectory already matches the output format and is unique then avoid the renaming 

109 if input_trajectories_format == output_trajectory_format and len(input_trajectory_files) == 1: 

110 converted_trajectory_filepath = input_trajectory_files[0].path 

111 converted_trajectory_file = File(converted_trajectory_filepath) 

112 # Join all input trajectory paths 

113 input_trajectory_paths = [ trajectory_file.path for trajectory_file in input_trajectory_files ] 

114 

115 # Set an intermeidate file for the trajectory while it is being converted 

116 # This prevents using an incomplete trajectory in case the workflow is suddenly interrupted while converting 

117 incompleted_converted_trajectory_filepath = self.pathify(INCOMPLETE_PREFIX + CONVERTED_TRAJECTORY) 

118 incompleted_converted_trajectory_file = File(incompleted_converted_trajectory_filepath) 

119 # If there is an incomplete trajectory then remove it 

120 if incompleted_converted_trajectory_file.exists: 

121 incompleted_converted_trajectory_file.remove() 

122 

123 # Convert input structure and trajectories to output structure and trajectory 

124 if not converted_structure_file.exists or not converted_trajectory_file.exists: 

125 print(' * Converting and merging') 

126 convert( 

127 input_structure_filepath = input_structure_file.path, 

128 output_structure_filepath = converted_structure_file.path, 

129 input_trajectory_filepaths = input_trajectory_paths, 

130 output_trajectory_filepath = incompleted_converted_trajectory_file.path, 

131 ) 

132 # Once converted, rename the trajectory file as completed 

133 rename(incompleted_converted_trajectory_file.path, converted_trajectory_file.path) 

134 

135 # Topologies are never converted, but they are kept in their original format 

136 

137 # --- provisional reference structure --- 

138 

139 # Now that we MUST have a PDB file we can set a provisional structure instance 

140 # Note that this structure is not yet corrected so it must be used with care 

141 # Otherwise we could have silent errors 

142 provisional_structure = Structure.from_pdb_file(converted_structure_file.path) 

143 # Now we can set a provisional coarse grain selection 

144 # This selection is useful to avoid problems with CG atom elements 

145 # Since this is proviosonal we will make it silent 

146 provisional_cg_selection = self._set_cg_selection(provisional_structure, verbose=False) 

147 for atom_index in provisional_cg_selection.atom_indices: 

148 provisional_structure.atoms[atom_index].element = CG_ATOM_ELEMENT 

149 

150 # --- FILTERING ATOMS ------------------------------------------------------------ 

151 

152 # Find out if we need to filter 

153 # i.e. check if there is a selection filter and it matches some atoms 

154 must_filter = bool(filter_selection) 

155 

156 # Set output filenames for the already filtered structure and trajectory 

157 # Note that this is the only step affecting topology and thus here we output the definitive topology 

158 filtered_structure_file = File(self.pathify(FILTERED_STRUCTURE)) if must_filter else converted_structure_file 

159 filtered_trajectory_file = File(self.pathify(FILTERED_TRAJECTORY)) if must_filter else converted_trajectory_file 

160 filtered_topology_file = output_topology_file if must_filter else input_topology_file 

161 

162 # Set an intermeidate file for the trajectory while it is being filtered 

163 # This prevents using an incomplete trajectory in case the workflow is suddenly interrupted while filtering 

164 incompleted_filtered_trajectory_filepath = self.pathify(INCOMPLETE_PREFIX + FILTERED_TRAJECTORY) 

165 incompleted_filtered_trajectory_file = File(incompleted_filtered_trajectory_filepath) 

166 # If there is an incomplete trajectory then remove it 

167 if incompleted_filtered_trajectory_file.exists: 

168 incompleted_filtered_trajectory_file.remove() 

169 

170 # Check if any output file is missing 

171 missing_filter_output = not filtered_structure_file.exists or not filtered_trajectory_file.exists 

172 

173 # Check if parameters have changed 

174 # Note that for this specific step only filtering is important 

175 previous_filtered_parameters = self.cache.retrieve(FILTERED) 

176 same_filtered_parameters = previous_filtered_parameters == filter_selection 

177 

178 # Filter atoms in structure, trajectory and topology if required and not done yet 

179 if must_filter and (missing_filter_output or not same_filtered_parameters): 

180 print(' * Filtering atoms') 

181 filter_atoms( 

182 input_structure_file = converted_structure_file, 

183 input_trajectory_file = converted_trajectory_file, 

184 input_topology_file = input_topology_file, # We use input topology 

185 output_structure_file = filtered_structure_file, 

186 output_trajectory_file = incompleted_filtered_trajectory_file, 

187 output_topology_file = filtered_topology_file, # We genereate the definitive topology 

188 reference_structure = provisional_structure, 

189 filter_selection = filter_selection, 

190 ) 

191 # Once filetered, rename the trajectory file as completed 

192 rename(incompleted_filtered_trajectory_file.path, filtered_trajectory_file.path) 

193 # Update the cache 

194 self.cache.update(FILTERED, filter_selection) 

195 

196 # --- provisional reference structure --- 

197 

198 # Now that we have a filtered PDB file we have to update provisional structure instance 

199 # Note that this structure is not yet corrected so it must be used with care 

200 # Otherwise we could have silent errors 

201 provisional_structure = Structure.from_pdb_file(filtered_structure_file.path) 

202 # Again, set the coarse grain atoms 

203 # Since elements may be needed to guess PBC selection we must solve them right before 

204 # Since this is proviosonal we will make it silent 

205 provisional_cg_selection = self._set_cg_selection(provisional_structure, verbose=False) 

206 for atom_index in provisional_cg_selection.atom_indices: 

207 provisional_structure.atoms[atom_index].element = CG_ATOM_ELEMENT 

208 # Also we can set a provisional PBC selection 

209 # This selection is useful both for imaging/fitting and for the correction 

210 # We will make sure that the provisonal and the final PBC selections match 

211 # Since this is proviosonal we will make it silent 

212 provisional_pbc_selection = self._set_pbc_selection(provisional_structure, verbose=False) 

213 

214 # --- IMAGING AND FITTING ------------------------------------------------------------ 

215 

216 # There is no logical way to know if the trajectory is already imaged or it must be imaged 

217 # We rely exclusively in input flags 

218 must_image = image or fit 

219 

220 # Set output filenames for the already filtered structure and trajectory 

221 imaged_structure_file = File(self.pathify(IMAGED_STRUCTURE)) if must_image else filtered_structure_file 

222 imaged_trajectory_file = File(self.pathify(IMAGED_TRAJECTORY)) if must_image else filtered_trajectory_file 

223 

224 # Set an intermeidate file for the trajectory while it is being imaged 

225 # This prevents using an incomplete trajectory in case the workflow is suddenly interrupted while imaging 

226 incompleted_imaged_trajectory_filepath = self.pathify(INCOMPLETE_PREFIX + IMAGED_TRAJECTORY) 

227 incompleted_imaged_trajectory_file = File(incompleted_imaged_trajectory_filepath) 

228 # If there is an incomplete trajectory then remove it 

229 if incompleted_imaged_trajectory_file.exists: 

230 incompleted_imaged_trajectory_file.remove() 

231 

232 # Check if any output file is missing 

233 missing_imaged_output = not imaged_structure_file.exists or not imaged_trajectory_file.exists 

234 

235 # Check if parameters have changed 

236 # Note that for this step the filter parameters is also important 

237 previous_imaged_parameters = self.cache.retrieve(IMAGED) 

238 same_imaged_parameters = previous_imaged_parameters == (image, fit, *translation) 

239 

240 # Image the trajectory if it is required 

241 # i.e. make the trajectory uniform avoiding atom jumps and making molecules to stay whole 

242 # Fit the trajectory by removing the translation and rotation if it is required 

243 if must_image and (missing_imaged_output or not same_imaged_parameters): 

244 print(' * Imaging and fitting') 

245 image_and_fit( 

246 input_structure_file = filtered_structure_file, 

247 input_trajectory_file = filtered_trajectory_file, 

248 input_topology_file = filtered_topology_file, # This is optional if there are no PBC residues 

249 output_structure_file = imaged_structure_file, 

250 output_trajectory_file = incompleted_imaged_trajectory_file, 

251 image = image, 

252 fit = fit, 

253 translation = translation, 

254 structure = provisional_structure, 

255 pbc_selection = provisional_pbc_selection 

256 ) 

257 # Once imaged, rename the trajectory file as completed 

258 rename(incompleted_imaged_trajectory_file.path, imaged_trajectory_file.path) 

259 # Update the cache 

260 self.cache.update(IMAGED, (image, fit, *translation)) 

261 # Update the provisional strucutre coordinates 

262 imaged_structure = Structure.from_pdb_file(imaged_structure_file.path) 

263 imaged_structure_coords = [ atom.coords for atom in imaged_structure.atoms ] 

264 provisional_structure.set_new_coordinates(imaged_structure_coords) 

265 

266 # --- CORRECTING STRUCTURE ------------------------------------------------------------ 

267 

268 # Note that this step, although it is foucsed in the structure, requires also the trajectory 

269 # Also the trajectory may be altered in very rare cases where coordinates must be resorted 

270 

271 # There is no possible reason to not correct the structure 

272 # This is the last step so the output files will be named as the output files of the whole processing 

273 

274 # WARNING: 

275 # For the correcting function we need the number of snapshots and at this point it should not be defined 

276 # Snapshots are calculated by default from the already processed structure and trajectory 

277 # For this reason we can not rely on the public snapshots getter 

278 # We must calculate snapshots here using last step structure and trajectory 

279 snapshots = None 

280 # If we already have a value in the cache then use it, unless the input trajectory has changed 

281 same_trajectory = 'input_trajectory_files' not in task.changed_inputs 

282 if same_trajectory: snapshots = self.cache.retrieve(SNAPSHOTS_FLAG) 

283 # Calculate the new value 

284 if snapshots == None: 

285 snapshots = get_frames_count(imaged_structure_file, imaged_trajectory_file) 

286 # Update the MD snapshots value 

287 self.get_snapshots._set_parent_output(self, snapshots) 

288 # Save the snapshots value in the cache as well 

289 self.cache.update(SNAPSHOTS_FLAG, snapshots) 

290 

291 # WARNING: 

292 # We may need to resort atoms in the structure corrector function 

293 # In such case, bonds and charges must be resorted as well and saved apart to keep values coherent 

294 # Bonds are calculated during the structure corrector but atom charges must be extracted no 

295 charges = get_charges(filtered_topology_file) 

296 self.project.get_charges._set_parent_output(self.project, charges) 

297 

298 print(' * Correcting structure') 

299 

300 # Set output filenames for the already filtered structure and trajectory 

301 corrected_structure_file = File(self.pathify(CORRECTED_STRUCTURE)) 

302 corrected_trajectory_file = File(self.pathify(CORRECTED_TRAJECTORY)) 

303 

304 # Correct the structure 

305 # This function reads and or modifies the following MD variables: 

306 # snapshots, reference_bonds, register, cache, mercy, trust 

307 structure_corrector( 

308 structure = provisional_structure, 

309 input_trajectory_file = imaged_trajectory_file, 

310 input_topology_file = filtered_topology_file, 

311 output_structure_file = corrected_structure_file, 

312 output_trajectory_file = corrected_trajectory_file, 

313 MD = self, 

314 pbc_selection = provisional_pbc_selection, 

315 snapshots = snapshots, 

316 register = self.register, 

317 mercy = self.project.mercy, 

318 trust = self.project.trust 

319 ) 

320 

321 # If the corrected output exists then use it 

322 # Otherwise use the previous step files 

323 # Corrected files are generated only when changes are made in these files 

324 corrected_structure_file = corrected_structure_file if corrected_structure_file.exists else imaged_structure_file 

325 corrected_trajectory_file = corrected_trajectory_file if corrected_trajectory_file.exists else imaged_trajectory_file 

326 

327 # Set for every type of file (structure, trajectory and topology) the input, the last processed step and the output files 

328 input_and_output_files = [ 

329 (input_structure_file, corrected_structure_file, output_structure_file), 

330 (input_trajectory_files[0], corrected_trajectory_file, output_trajectory_file), 

331 (input_topology_file, filtered_topology_file, output_topology_file) 

332 ] 

333 # Set a list of intermediate files 

334 intermediate_files = set([ 

335 converted_structure_file, converted_trajectory_file, 

336 filtered_structure_file, filtered_trajectory_file, 

337 imaged_structure_file, imaged_trajectory_file, 

338 ]) 

339 # Now we must rename files to match the output file 

340 # Processed files remain with some intermediate filename 

341 for input_file, processed_file, output_file in input_and_output_files: 

342 # If the processed file is already the output file then there is nothing to do here 

343 # This means it was already the input file and no changes were made 

344 if processed_file == output_file: 

345 continue 

346 # There is a chance that the input files have not been modified 

347 # This means the input format has already the output format and it is not to be imaged, fitted or corrected 

348 # However we need the output files to exist and we dont want to rename the original ones to conserve them 

349 # In order to not duplicate data, we will setup a symbolic link to the input files with the output filepaths 

350 if processed_file == input_file: 

351 # If output file exists and its the same as the input file, we can not create a symlink from a file to the same file 

352 if output_file.exists: 

353 output_file.remove() 

354 output_file.set_symlink_to(input_file) 

355 # Otherwise rename the last intermediate file as the output file 

356 else: 

357 # In case the processed file is a symlink we must make sure the symlink is not made to a intermediate step 

358 # Intermediate steps will be removed further and thus the symlink would break 

359 # If the symlinks points to the input file there is no problem though 

360 if processed_file.is_symlink(): 

361 target_file = processed_file.get_symlink() 

362 if target_file in intermediate_files: 

363 target_file.rename_to(output_file) 

364 else: 

365 processed_file.rename_to(output_file) 

366 # If the files is not a symlink then simply rename it 

367 else: 

368 processed_file.rename_to(output_file) 

369 

370 # Save the internal variables 

371 self._structure_file = output_structure_file 

372 self._trajectory_file = output_trajectory_file 

373 self.project._topology_file = output_topology_file 

374 

375 # If the input and outpu file have the same name then overwrite input cksums 

376 # Thus we avoid this task to run forever 

377 # DANI: Esto es provisional, hay que prohibir que los inputs se llamen como los outputs 

378 if input_structure_file == output_structure_file: 

379 task.cache_cksums['input_structure_file'] = output_structure_file.get_cksum() 

380 if len(input_trajectory_files) == 1 and input_trajectory_files[0] == output_trajectory_file: 

381 task.cache_cksums['input_trajectory_files'] = get_cksum_id([ output_trajectory_file ]) 

382 if input_topology_file == output_topology_file: 

383 task.cache_cksums['input_topology_file'] = str(MISSING_TOPOLOGY) if output_topology_file == MISSING_TOPOLOGY else output_topology_file.get_cksum() 

384 

385 # --- Definitive PBC selection --- 

386 

387 # Now that we have the corrected structure we can set the definitive PBC atoms 

388 # Make sure the selection is identical to the provisional selection 

389 if self.pbc_selection != provisional_pbc_selection: 

390 raise InputError('PBC selection is not consistent after correcting the structure. ' 

391 'Please consider using a different PBC selection. ' 

392 'Avoid relying in atom distances or elements to avoid this problem.') 

393 

394 # --- RUNNING FINAL TESTS ------------------------------------------------------------ 

395 

396 # Note that some tests have been run already 

397 # e.g. stable bonds is run in the structure corrector function 

398 

399 # Note that tests here do not modify any file 

400 

401 # Check the trajectory has not sudden jumps 

402 self.is_trajectory_integral() 

403 

404 # Make a final test summary 

405 self.print_tests_summary() 

406 

407 # Issue some warnings if failed or never run tests are skipped 

408 self._issue_required_test_warnings 

409 

410 # --- Cleanup intermediate files 

411 

412 # Set a list of input files to NOT be removed 

413 inputs_files = set([ input_structure_file, *input_trajectory_files, input_topology_file ]) 

414 # We must make sure an intermediate file is not actually an input file before deleting it 

415 removable_files = intermediate_files - inputs_files 

416 # Now delete every removable file 

417 for removable_file in removable_files: 

418 # Note that a broken symlink does not 'exists' 

419 if removable_file.exists or removable_file.is_symlink(): 

420 removable_file.remove()