Coverage for mddb_workflow / tools / process_input_files.py: 69%

185 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 18:45 +0000

1from mddb_workflow.utils.auxiliar import InputError, MISSING_TOPOLOGY, warn 

2from mddb_workflow.utils.constants import CONVERTED_STRUCTURE, CONVERTED_TRAJECTORY 

3from mddb_workflow.utils.constants import FILTERED, FILTERED_STRUCTURE, FILTERED_TRAJECTORY 

4from mddb_workflow.utils.constants import IMAGED, IMAGED_STRUCTURE, IMAGED_TRAJECTORY 

5from mddb_workflow.utils.constants import CORRECTED_STRUCTURE, CORRECTED_TRAJECTORY 

6from mddb_workflow.utils.constants import INCOMPLETE_PREFIX, CG_ATOM_ELEMENT, SNAPSHOTS_FLAG 

7from mddb_workflow.utils.constants import FAITH_BYPASS 

8from mddb_workflow.utils.file import File 

9from mddb_workflow.utils.structures import Structure 

10from mddb_workflow.utils.pyt_spells import get_frames_count 

11from mddb_workflow.utils.arg_cksum import get_cksum_id 

12from mddb_workflow.utils.type_hints import * 

13 

14from mddb_workflow.tools.check_inputs import check_inputs, PREFILTERED_TOPOLOGY_EXCEPTION 

15from mddb_workflow.tools.conversions import convert 

16from mddb_workflow.tools.filter_atoms import filter_atoms 

17from mddb_workflow.tools.image_and_fit import image_and_fit 

18from mddb_workflow.tools.get_charges import get_charges 

19from mddb_workflow.tools.structure_corrector import structure_corrector 

20 

21 

22def is_amber_top (input_topology_file : 'File') -> bool: 

23 """Check if a .top file is from Amber. 

24 Returns True if it is Amber, False if it is Gromacs. 

25 """ 

26 if input_topology_file != MISSING_TOPOLOGY and \ 

27 input_topology_file.extension == 'top': 

28 with open(input_topology_file.path, 'r') as f: 

29 lines = f.readlines(5) 

30 

31 # If all non-empty first words are '%' assume Amber (.prmtop) 

32 first_words = {line.split()[0] for line in lines if line.strip()} 

33 if '%VERSION' in first_words: 

34 return True 

35 

36 # If any line starts with ';' or '[' assume Gromacs (.top) 

37 first_chars = {word[0] for word in first_words} 

38 if any(c in (';', '[') for c in first_chars): 

39 return False 

40 

41 # Otherwise we cannot decide 

42 raise InputError('Unable to infer topology format from first five lines') 

43 return False 

44 

45 

46def process_input_files ( 

47 input_structure_file : 'File', 

48 input_trajectory_files : 'File', 

49 input_topology_file : 'File', 

50 # Output 

51 output_directory : str, 

52 output_topology_file : 'File', 

53 output_structure_file : 'File', 

54 output_trajectory_file : 'File', 

55 # Processing parameters 

56 filter_selection : str, 

57 image : bool, 

58 fit : bool, 

59 translation : tuple, 

60 # Make sure the MD is used only to set values or use its functions, but not to get values 

61 # Values must be passed separately as inputs so the task can identify when inputs change 

62 self : 'MD', 

63 # Get the task which is calling this function 

64 # Thus we may know which inputs have changed compared to previous runs 

65 task : 'Task', 

66 # The faith argument 

67 faith : bool, 

68): 

69 """Process input files to generate the processed files. 

70 This process corrects and standarizes the topology, the trajectory and the structure. 

71 """ 

72 # Make sure we do not enter in a loop 

73 # This may happen when we read/call an output value/file by mistake 

74 if hasattr(self, '_processed'): raise RuntimeError('Looped processing') 

75 self._processed = True 

76 

77 # Input trajectories should have all the same format 

78 input_trajectory_formats = set([ trajectory_file.format for trajectory_file in input_trajectory_files ]) 

79 if len(input_trajectory_formats) > 1: 

80 raise InputError('All input trajectory files must have the same format') 

81 

82 # If the faith argument was passed then set input files as output files and exit 

83 if faith: 

84 self.register.add_warning(FAITH_BYPASS, 'All processing steps and tests were skipped') 

85 if output_structure_file.exists: output_structure_file.remove() 

86 output_structure_file.set_symlink_to(input_structure_file) 

87 if output_trajectory_file.exists: output_trajectory_file.remove() 

88 output_trajectory_file.set_symlink_to(input_trajectory_files[0]) 

89 if output_topology_file != MISSING_TOPOLOGY: 

90 if output_topology_file.exists: output_topology_file.remove() 

91 output_topology_file.set_symlink_to(input_topology_file) 

92 return 

93 self.register.remove_warnings(FAITH_BYPASS) 

94 

95 # --- TOPOLOGY FORMAT ASSUMTION --------------------------------------------------------- 

96 

97 # Make a super fast check and an assumption 

98 # Topologies with the .top extension for us are considered Gromacs topology format 

99 # However it is usual than Amber topologies (ideally .prmtop) are also '.top' 

100 # So if the trajectory is Amber and the topology is .top then assume it is Amber 

101 input_trajectories_format = list(input_trajectory_formats)[0] 

102 if is_amber_top(input_topology_file): 

103 # Creating a topology symlink/copy with the correct extension 

104 warn('Topology is .top but the trajectory is from Amber. It is assumed the topology is .prmtop') 

105 reformatted_topology_file = input_topology_file.reformat('prmtop') 

106 output_topology_file.path = output_topology_file.extensionless_filepath + '.prmtop' 

107 if input_structure_file == input_topology_file: 

108 input_structure_file = reformatted_topology_file 

109 input_topology_file = reformatted_topology_file 

110 

111 # --- FIRST CHECK ----------------------------------------------------------------------- 

112 

113 # Check input files match in number of atoms 

114 # Here we have not standarized the format so we must check differently with every format 

115 exceptions = check_inputs(input_structure_file, input_trajectory_files, input_topology_file) 

116 

117 # There is a chance that the inputs checker has prefiltered the topology to match trajectory 

118 # If this is the case then use the prefiltered topology from now on 

119 prefiltered_topology = exceptions.get(PREFILTERED_TOPOLOGY_EXCEPTION, None) 

120 if prefiltered_topology: 

121 if input_structure_file == input_topology_file: 

122 input_structure_file = prefiltered_topology 

123 input_topology_file = prefiltered_topology 

124 

125 # --- CONVERTING AND MERGING ------------------------------------------------------------ 

126 

127 # Set the output format for the already converted structure 

128 input_structure_format = input_structure_file.format 

129 output_structure_format = output_structure_file.format 

130 # Set the output file for the already converted structure 

131 # If input structure already matches the output format then avoid the renaming 

132 if input_structure_format == output_structure_format: 

133 converted_structure_file = input_structure_file 

134 else: 

135 converted_structure_file = File(f'{output_directory}/{CONVERTED_STRUCTURE}') 

136 # Set the output format for the already converted trajectory 

137 input_trajectories_format = list(input_trajectory_formats)[0] 

138 output_trajectory_format = output_trajectory_file.format 

139 # Set the output file for the already converted trajectory 

140 # If input trajectory already matches the output format and is unique then avoid the renaming 

141 if input_trajectories_format == output_trajectory_format and len(input_trajectory_files) == 1: 

142 converted_trajectory_file = input_trajectory_files[0] 

143 else: 

144 converted_trajectory_file = File(f'{output_directory}/{CONVERTED_TRAJECTORY}') 

145 # Join all input trajectory paths 

146 input_trajectory_paths = [ trajectory_file.path for trajectory_file in input_trajectory_files ] 

147 

148 # Set an intermeidate file for the trajectory while it is being converted 

149 # This prevents using an incomplete trajectory in case the workflow is suddenly interrupted while converting 

150 incompleted_converted_trajectory_filepath = f'{output_directory}/{INCOMPLETE_PREFIX + CONVERTED_TRAJECTORY}' 

151 incompleted_converted_trajectory_file = File(incompleted_converted_trajectory_filepath) 

152 # If there is an incomplete trajectory then remove it 

153 if incompleted_converted_trajectory_file.exists: 

154 incompleted_converted_trajectory_file.remove() 

155 

156 # Convert input structure and trajectories to output structure and trajectory 

157 if not converted_structure_file.exists or not converted_trajectory_file.exists: 

158 print(' * Converting and merging') 

159 convert( 

160 input_structure_filepath = input_structure_file.path, 

161 output_structure_filepath = converted_structure_file.path, 

162 input_trajectory_filepaths = input_trajectory_paths, 

163 output_trajectory_filepath = incompleted_converted_trajectory_file.path, 

164 ) 

165 # Once converted, rename the trajectory file as completed 

166 # If the converted trajectory already exists then it means it is the input trajectory 

167 if converted_trajectory_file.exists: 

168 incompleted_converted_trajectory_file.remove() 

169 else: 

170 incompleted_converted_trajectory_file.rename_to(converted_trajectory_file) 

171 

172 # Topologies are never converted, but they are kept in their original format 

173 

174 # --- provisional reference structure --- 

175 

176 # Now that we MUST have a PDB file we can set a provisional structure instance 

177 # Note that this structure is not yet corrected so it must be used with care 

178 # Otherwise we could have silent errors 

179 provisional_structure = Structure.from_pdb_file(converted_structure_file.path) 

180 # Now we can set a provisional coarse grain selection 

181 # This selection is useful to avoid problems with CG atom elements 

182 # Since this is proviosonal we will make it silent 

183 provisional_cg_selection = self._set_cg_selection(provisional_structure, verbose=False) 

184 for atom_index in provisional_cg_selection.atom_indices: 

185 provisional_structure.atoms[atom_index].element = CG_ATOM_ELEMENT 

186 

187 # --- FILTERING ATOMS ------------------------------------------------------------ 

188 

189 # Find out if we need to filter 

190 # i.e. check if there is a selection filter and it matches some atoms 

191 must_filter = bool(filter_selection) 

192 

193 # Set output filenames for the already filtered structure and trajectory 

194 if must_filter: 

195 filtered_structure_file = File(f'{output_directory}/{FILTERED_STRUCTURE}') 

196 filtered_trajectory_file = File(f'{output_directory}/{FILTERED_TRAJECTORY}') 

197 else: 

198 filtered_structure_file = converted_structure_file 

199 filtered_trajectory_file = converted_trajectory_file 

200 # Note that this is the only step affecting topology and thus here we output the definitive topology 

201 filtered_topology_file = output_topology_file if must_filter else input_topology_file 

202 

203 # Set an intermeidate file for the trajectory while it is being filtered 

204 # This prevents using an incomplete trajectory in case the workflow is suddenly interrupted while filtering 

205 incompleted_filtered_trajectory_filepath = f'{output_directory}/{INCOMPLETE_PREFIX + FILTERED_TRAJECTORY}' 

206 incompleted_filtered_trajectory_file = File(incompleted_filtered_trajectory_filepath) 

207 # If there is an incomplete trajectory then remove it 

208 if incompleted_filtered_trajectory_file.exists: 

209 incompleted_filtered_trajectory_file.remove() 

210 

211 # Check if any output file is missing 

212 missing_filter_output = not filtered_structure_file.exists \ 

213 or not filtered_trajectory_file.exists \ 

214 or (filtered_topology_file != MISSING_TOPOLOGY and not filtered_topology_file.exists) 

215 

216 # Check if parameters have changed 

217 # Note that for this specific step only filtering is important 

218 previous_filtered_parameters = self.cache.retrieve(FILTERED) 

219 same_filtered_parameters = previous_filtered_parameters == filter_selection 

220 

221 # Filter atoms in structure, trajectory and topology if required and not done yet 

222 if must_filter and (missing_filter_output or not same_filtered_parameters): 

223 print(' * Filtering atoms') 

224 filter_atoms( 

225 input_structure_file = converted_structure_file, 

226 input_trajectory_file = converted_trajectory_file, 

227 input_topology_file = input_topology_file, # We use input topology 

228 output_structure_file = filtered_structure_file, 

229 output_trajectory_file = incompleted_filtered_trajectory_file, 

230 output_topology_file = filtered_topology_file, # We genereate the definitive topology 

231 reference_structure = provisional_structure, 

232 filter_selection = filter_selection, 

233 ) 

234 # Once filetered, rename the trajectory file as completed 

235 # If the filtered trajectory already exists then it means it is the input trajectory 

236 if filtered_trajectory_file.exists: 

237 incompleted_filtered_trajectory_file.remove() 

238 else: 

239 incompleted_filtered_trajectory_file.rename_to(filtered_trajectory_file) 

240 self.cache.update(FILTERED, filter_selection) 

241 

242 # --- provisional reference structure --- 

243 

244 # Now that we have a filtered PDB file we have to update provisional structure instance 

245 # Note that this structure is not yet corrected so it must be used with care 

246 # Otherwise we could have silent errors 

247 provisional_structure = Structure.from_pdb_file(filtered_structure_file.path) 

248 # Again, set the coarse grain atoms 

249 # Since elements may be needed to guess PBC selection we must solve them right before 

250 # Since this is proviosonal we will make it silent 

251 provisional_cg_selection = self._set_cg_selection(provisional_structure, verbose=False) 

252 for atom_index in provisional_cg_selection.atom_indices: 

253 provisional_structure.atoms[atom_index].element = CG_ATOM_ELEMENT 

254 # Also we can set a provisional PBC selection 

255 # This selection is useful both for imaging/fitting and for the correction 

256 # We will make sure that the provisonal and the final PBC selections match 

257 # Since this is proviosonal we will make it silent 

258 provisional_pbc_selection = self._set_pbc_selection(provisional_structure, verbose=False) 

259 

260 # --- IMAGING AND FITTING ------------------------------------------------------------ 

261 

262 # There is no logical way to know if the trajectory is already imaged or it must be imaged 

263 # We rely exclusively in input flags 

264 must_image = image or fit 

265 

266 # Set output filenames for the already filtered structure and trajectory 

267 if must_image: 

268 imaged_structure_file = File(f'{output_directory}/{IMAGED_STRUCTURE}') 

269 imaged_trajectory_file = File(f'{output_directory}/{IMAGED_TRAJECTORY}') 

270 else: 

271 imaged_structure_file = filtered_structure_file 

272 imaged_trajectory_file = filtered_trajectory_file 

273 

274 # Set an intermeidate file for the trajectory while it is being imaged 

275 # This prevents using an incomplete trajectory in case the workflow is suddenly interrupted while imaging 

276 incompleted_imaged_trajectory_filepath = f'{output_directory}/{INCOMPLETE_PREFIX + IMAGED_TRAJECTORY}' 

277 incompleted_imaged_trajectory_file = File(incompleted_imaged_trajectory_filepath) 

278 # If there is an incomplete trajectory then remove it 

279 if incompleted_imaged_trajectory_file.exists: 

280 incompleted_imaged_trajectory_file.remove() 

281 

282 # Check if any output file is missing 

283 missing_imaged_output = not imaged_structure_file.exists or not imaged_trajectory_file.exists 

284 

285 # Check if parameters have changed 

286 # Note that for this step the filter parameters is also important 

287 previous_imaged_parameters = self.cache.retrieve(IMAGED) 

288 same_imaged_parameters = previous_imaged_parameters == [image, fit, *translation] 

289 

290 # Image the trajectory if it is required 

291 # i.e. make the trajectory uniform avoiding atom jumps and making molecules to stay whole 

292 # Fit the trajectory by removing the translation and rotation if it is required 

293 if must_image and (missing_imaged_output or not same_imaged_parameters): 

294 print(' * Imaging and fitting') 

295 image_and_fit( 

296 input_structure_file = filtered_structure_file, 

297 input_trajectory_file = filtered_trajectory_file, 

298 input_topology_file = filtered_topology_file, # This is optional if there are no PBC residues 

299 output_structure_file = imaged_structure_file, 

300 output_trajectory_file = incompleted_imaged_trajectory_file, 

301 image = image, 

302 fit = fit, 

303 translation = translation, 

304 structure = provisional_structure, 

305 pbc_selection = provisional_pbc_selection 

306 ) 

307 # Once imaged, rename the trajectory file as completed 

308 # If the imaged trajectory already exists then it means it is the input trajectory 

309 if imaged_trajectory_file.exists: 

310 incompleted_imaged_trajectory_file.remove() 

311 else: 

312 incompleted_imaged_trajectory_file.rename_to(imaged_trajectory_file) 

313 # Update the cache 

314 self.cache.update(IMAGED, [image, fit, *translation]) 

315 # Update the provisional strucutre coordinates 

316 imaged_structure = Structure.from_pdb_file(imaged_structure_file.path) 

317 imaged_structure_coords = [ atom.coords for atom in imaged_structure.atoms ] 

318 provisional_structure.set_new_coordinates(imaged_structure_coords) 

319 

320 # --- CORRECTING STRUCTURE ------------------------------------------------------------ 

321 

322 # Note that this step, although it is foucsed in the structure, requires also the trajectory 

323 # Also the trajectory may be altered in very rare cases where coordinates must be resorted 

324 

325 # There is no possible reason to not correct the structure 

326 # This is the last step so the output files will be named as the output files of the whole processing 

327 

328 # WARNING: 

329 # For the correcting function we need the number of snapshots and at this point it should not be defined 

330 # Snapshots are calculated by default from the already processed structure and trajectory 

331 # For this reason we can not rely on the public snapshots getter 

332 # We must calculate snapshots here using last step structure and trajectory 

333 snapshots = None 

334 # If we already have a value in the cache then use it, unless the input trajectory has changed 

335 same_trajectory = 'input_trajectory_files' not in task.changed_inputs 

336 if same_trajectory: snapshots = self.cache.retrieve(SNAPSHOTS_FLAG) 

337 # Calculate the new value 

338 if snapshots is None: 

339 snapshots = get_frames_count(imaged_structure_file, imaged_trajectory_file) 

340 # Update the MD task snapshots value 

341 self.get_snapshots.prefill(self, snapshots, { 

342 'structure_file': imaged_structure_file, 

343 'trajectory_file': imaged_trajectory_file 

344 }) 

345 # Save the snapshots value in the cache as well 

346 self.cache.update(SNAPSHOTS_FLAG, snapshots) 

347 

348 # WARNING: 

349 # We may need to resort atoms in the structure corrector function 

350 # In such case, bonds and charges must be resorted as well and saved apart to keep values coherent 

351 # Bonds are calculated during the structure corrector but atom charges must be extracted no 

352 charges = get_charges(filtered_topology_file) 

353 self.project.get_charges.prefill(self.project, charges, { 

354 'topology_file': filtered_topology_file 

355 }) 

356 

357 print(' * Correcting structure') 

358 

359 # Set output filenames for the already filtered structure and trajectory 

360 corrected_structure_filepath = f'{output_directory}/{CORRECTED_STRUCTURE}' 

361 corrected_structure_file = File(corrected_structure_filepath) 

362 corrected_trajectory_filepath = f'{output_directory}/{CORRECTED_TRAJECTORY}' 

363 corrected_trajectory_file = File(corrected_trajectory_filepath) 

364 

365 # Correct the structure 

366 # This function reads and or modifies the following MD variables: 

367 # snapshots, reference_bonds, register, cache, mercy, trust 

368 structure_corrector( 

369 structure = provisional_structure, 

370 input_trajectory_file = imaged_trajectory_file, 

371 input_topology_file = filtered_topology_file, 

372 output_structure_file = corrected_structure_file, 

373 output_trajectory_file = corrected_trajectory_file, 

374 MD = self, 

375 pbc_selection = provisional_pbc_selection, 

376 snapshots = snapshots, 

377 register = self.register, 

378 mercy = self.project.mercy, 

379 trust = self.project.trust, 

380 guess_bonds = self.project.guess_bonds, 

381 ignore_bonds = self.project.ignore_bonds, 

382 ) 

383 

384 # If the corrected output exists then use it 

385 # Otherwise use the previous step files 

386 # Corrected files are generated only when changes are made in these files 

387 corrected_structure_file = corrected_structure_file if corrected_structure_file.exists else imaged_structure_file 

388 corrected_trajectory_file = corrected_trajectory_file if corrected_trajectory_file.exists else imaged_trajectory_file 

389 

390 # Set for every type of file (structure, trajectory and topology) the input, the last processed step and the output files 

391 input_and_output_files = [ 

392 (input_structure_file, corrected_structure_file, output_structure_file), 

393 (input_trajectory_files[0], corrected_trajectory_file, output_trajectory_file), 

394 (input_topology_file, filtered_topology_file, output_topology_file) 

395 ] 

396 # Set a list of intermediate files 

397 intermediate_files = set([ 

398 converted_structure_file, converted_trajectory_file, 

399 filtered_structure_file, filtered_trajectory_file, 

400 imaged_structure_file, imaged_trajectory_file, 

401 ]) 

402 # Now we must rename files to match the output file 

403 # Processed files remain with some intermediate filename 

404 for input_file, processed_file, output_file in input_and_output_files: 

405 # If the processed file is already the output file then there is nothing to do here 

406 # This means it was already the input file and no changes were made 

407 if processed_file == output_file: continue 

408 # There is a chance that the input files have not been modified 

409 # This means the input format has already the output format and it is not to be imaged, fitted or corrected 

410 # However we need the output files to exist and we dont want to rename the original ones to conserve them 

411 # In order to not duplicate data, we will setup a symbolic link to the input files with the output filepaths 

412 if processed_file == input_file: 

413 # If output file exists and its the same as the input file, we can not create a symlink from a file to the same file 

414 if output_file.exists: 

415 output_file.remove() 

416 output_file.set_symlink_to(input_file) 

417 # Otherwise rename the last intermediate file as the output file 

418 else: 

419 # In case the processed file is a symlink we must make sure the symlink is not made to a intermediate step 

420 # Intermediate steps will be removed further and thus the symlink would break 

421 # If the symlinks points to the input file there is no problem though 

422 if processed_file.is_symlink(): 

423 target_file = processed_file.get_symlink() 

424 if target_file in intermediate_files: 

425 target_file.rename_to(output_file) 

426 else: 

427 processed_file.rename_to(output_file) 

428 # If the files is not a symlink then simply rename it 

429 else: 

430 processed_file.rename_to(output_file) 

431 

432 # Save the internal variables 

433 self._structure_file = output_structure_file 

434 self._trajectory_file = output_trajectory_file 

435 self.project._topology_file = output_topology_file 

436 

437 # If the input and output file have the same name then overwrite input cksums 

438 # Thus we avoid this task to run forever 

439 # DANI: Esto es provisional, hay que prohibir que los inputs se llamen como los outputs 

440 if input_structure_file == output_structure_file: 

441 task.cache_cksums['input_structure_file'] = get_cksum_id(output_structure_file) 

442 if len(input_trajectory_files) == 1 and input_trajectory_files[0] == output_trajectory_file: 

443 task.cache_cksums['input_trajectory_files'] = get_cksum_id([ output_trajectory_file ]) 

444 if input_topology_file == output_topology_file: 

445 task.cache_cksums['input_topology_file'] = get_cksum_id(output_topology_file) 

446 

447 # --- Definitive PBC selection --- 

448 

449 # Now that we have the corrected structure we can set the definitive PBC atoms 

450 # Make sure the selection is identical to the provisional selection 

451 if self.pbc_selection != provisional_pbc_selection: 

452 raise InputError('PBC selection is not consistent after correcting the structure. ' 

453 'Please consider using a different PBC selection. ' 

454 'Avoid relying in atom distances or elements to avoid this problem.') 

455 

456 # --- RUNNING FINAL TESTS ------------------------------------------------------------ 

457 

458 # Note that tests here do not modify any file 

459 

460 # Check the trajectory has not sudden jumps 

461 self.is_trajectory_integral() 

462 

463 # Make a final test summary 

464 self.print_tests_summary() 

465 

466 # Issue some warnings if failed or never run tests are skipped 

467 self._issue_required_test_warnings() 

468 

469 # --- Cleanup intermediate files 

470 

471 # Set a list of input files to NOT be removed 

472 inputs_files = set([ input_structure_file, *input_trajectory_files, input_topology_file ]) 

473 # We must make sure an intermediate file is not actually an input file before deleting it 

474 removable_files = intermediate_files - inputs_files 

475 # Now delete every removable file 

476 for removable_file in removable_files: 

477 # Note that a broken symlink does not 'exists' 

478 if removable_file.exists or removable_file.is_symlink(): 

479 removable_file.remove()