Coverage for mddb_workflow / tools / structure_corrector.py: 61%

156 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 18:45 +0000

1from os import remove 

2from mddb_workflow.tools.get_bonds import find_safe_bonds, do_bonds_match, get_bonds_reference_frame 

3from mddb_workflow.tools.get_bonds import get_excluded_atoms_selection 

4from mddb_workflow.tools.get_pdb_frames import get_pdb_frame 

5from mddb_workflow.tools.get_charges import get_charges 

6from mddb_workflow.utils.auxiliar import InputError, TestFailure, MISSING_BONDS 

7from mddb_workflow.utils.auxiliar import get_new_letter, save_json, warn 

8from mddb_workflow.utils.constants import CORRECT_ELEMENTS, STABLE_BONDS_FLAG, COHERENT_BONDS_FLAG 

9from mddb_workflow.utils.structures import Structure 

10from mddb_workflow.utils.type_hints import * 

11 

12 

13def structure_corrector( 

14 # Note that this is an early provisional structure 

15 structure: 'Structure', 

16 input_trajectory_file: Optional['File'], 

17 input_topology_file: Union['File', Exception], 

18 output_structure_file: 'File', 

19 output_trajectory_file: Optional['File'], 

20 MD: 'MD', 

21 # Note that this is an early provisional atom selection 

22 pbc_selection: 'Selection', 

23 snapshots: int, 

24 register: 'Register', 

25 mercy: list[str], 

26 trust: list[str], 

27 guess_bonds: bool, 

28 ignore_bonds: bool, 

29) -> dict: 

30 """Analyze the structure looking for irregularities and then modify the structure to standarize the format. 

31 

32 Supported cases: 

33 

34 * Missing/Non-standard elements -> Atom elements are guessed when missing and standarized (e.g. ZN -> Zn) 

35 * Unstable atom bonds: A bond is formed / broken because its 2 atoms are too close / far in the structure 

36 -> Coordinates in the pdb file are replaced by those of the first frame in the trajectory where bonds are stable 

37 * Incoherent atom bonds -> If an atom has unexpected bonds then the simulation may be wrong 

38 -> Stop the workflow and warn the user 

39 * Missing chains -> Chains are added through VMD 

40 * Splitted chain -> Kill the process and warn the user. This should never happen by just reading a pdb, but after correcting 

41 * Repeated chains -> Chains are renamed (e.g. A, G, B, G, C, G -> A, G, B, H, C, I) 

42 * Splitted residues -> Atoms are sorted together by residue. Trajectory coordinates are also sorted 

43 * Repeated residues -> Residues are renumerated (e.g. 1, 2, 3, 1, 2, 1, 2 -> 1, 2, 3, 4, 5, 6, 7) 

44 * Repeated atoms -> Atoms are renamed with their numeration (e.g. C, C, C, O, O -> C1, C2, C3, O1, O2) 

45 

46 Note that the 'mercy' flag may be passed for critical checkings to not kill the process on fail. 

47 Note that the 'trust' flag may be passed for critical checkings to skip them. 

48 

49 This function also sets some values in the passed MD. 

50 

51 """ 

52 # Write the inital output structure file which will be overwritten several times further 

53 print(' The structure file has been copied -> ' + output_structure_file.filename) 

54 structure.generate_pdb_file(output_structure_file.path) 

55 

56 # ------------------------------------------------------------------------------------------ 

57 # Missing/Non-standard elements ------------------------------------------------------------ 

58 # ------------------------------------------------------------------------------------------ 

59 

60 # It is important to fix elements before trying to fix bonds, since elements have an impact on bonds 

61 # VMD logic to find bonds relies in the atom element to set the covalent bond distance cutoff 

62 if structure.fix_atom_elements(trust=(CORRECT_ELEMENTS in trust)): 

63 # Update the structure file using the corrected structure 

64 print(' The structure file has been modified (fix atom elements) -> ' + output_structure_file.filename) 

65 structure.generate_pdb_file(output_structure_file.path) 

66 

67 # ------------------------------------------------------------------------------------------ 

68 # Unstable atom bonds ---------------------------------------------------------------------- 

69 # ------------------------------------------------------------------------------------------ 

70 

71 # Set if stable bonds have to be checked 

72 # Note that we must not skip this even if the test already passed 

73 # It may be a corrected structure the one which passed the structure, while this structure comes from the raw input 

74 must_check_stable_bonds = STABLE_BONDS_FLAG not in trust 

75 

76 # Get safe bonds 

77 # Use topology bonds if possible 

78 # Otherwise guess bonds by guessing bonds according to coordinates and atom radius for 10 frames along the trajectory 

79 safe_bonds = find_safe_bonds( 

80 input_topology_file, 

81 output_structure_file, 

82 input_trajectory_file, 

83 must_check_stable_bonds, 

84 snapshots, 

85 structure, 

86 register, 

87 guess_bonds, 

88 ignore_bonds 

89 ) 

90 

91 # Check if the structure is missing bonds 

92 missing_any_bonds = any(bond == MISSING_BONDS for bond in safe_bonds) 

93 if missing_any_bonds: must_check_stable_bonds = False 

94 

95 # If safe bonds do not match structure bonds then we have to fix it 

96 def check_stable_bonds(): 

97 # Save the current structure bonds to further compare with the safe bonds 

98 current_bonds = structure.bonds 

99 # Now force safe bonds in the structure even if they "match" already 

100 # Note that some bonds are excluded from the check and they may be wrong in the structure 

101 # e.g. coarse grain atom bonds 

102 structure.bonds = safe_bonds 

103 # If we have been requested to skip this test then we are done 

104 if not must_check_stable_bonds: return 

105 # Reset warnings related to this analysis 

106 register.remove_warnings(STABLE_BONDS_FLAG) 

107 # Set some atoms which are to be skipped from these test given their "fake" nature 

108 excluded_atoms_selection = get_excluded_atoms_selection(structure, pbc_selection) 

109 # If bonds match from the begining we are done as well 

110 print(f'Checking default structure bonds ({STABLE_BONDS_FLAG})') 

111 if do_bonds_match(current_bonds, safe_bonds, excluded_atoms_selection, verbose=True, atoms=structure.atoms): 

112 register.update_test(STABLE_BONDS_FLAG, True) 

113 print(' They are good') 

114 return 

115 print(' They are wrong') 

116 # Find the first frame in the whole trajectory where safe bonds are respected 

117 bonds_reference_frame = get_bonds_reference_frame( 

118 structure_file=output_structure_file, 

119 trajectory_file=input_trajectory_file, 

120 snapshots=snapshots, 

121 reference_bonds=safe_bonds, 

122 structure=structure, 

123 pbc_selection=pbc_selection 

124 ) 

125 # Update the task output so it does not have to be repeated further 

126 # IMPORTANT: Note that this is not always run, but only when default structure bonds are wrong 

127 # IMPORTANT: Thus it is NORMAL when the reference frame is calculated further in the workflow 

128 MD.get_reference_frame.prefill(MD, bonds_reference_frame, { 

129 'structure_file': output_structure_file, 

130 'trajectory_file': input_trajectory_file, 

131 'snapshots': snapshots, 

132 'reference_bonds': safe_bonds, 

133 'structure': structure, 

134 'pbc_selection': pbc_selection 

135 }) 

136 # If there is no reference frame then stop here since there must be a problem 

137 if bonds_reference_frame == None: 

138 print('There is no reference frame for the safe bonds. Is the trajectory not imaged?') 

139 must_be_killed = STABLE_BONDS_FLAG not in mercy 

140 if must_be_killed: 

141 raise TestFailure('Failed to find stable bonds') 

142 register.update_test(STABLE_BONDS_FLAG, False) 

143 register.add_warning(STABLE_BONDS_FLAG, ('Could not find a frame in the trajectory respecting all bonds if bonds were guessed according to atom coordinates and radius.\n' 

144 'The main PDB structure is a default structure and it would be considered to have wrong bonds if they were predicted as previously stated.')) 

145 return 

146 # Set also the safe bonds frame structure to mine its coordinates 

147 safe_bonds_frame_filename = get_pdb_frame(output_structure_file.path, input_trajectory_file.path, bonds_reference_frame) 

148 safe_bonds_frame_structure = Structure.from_pdb_file(safe_bonds_frame_filename) 

149 # Set all coordinates in the main structure by copying the safe bonds frame coordinates 

150 for atom_1, atom_2 in zip(structure.atoms, safe_bonds_frame_structure.atoms): 

151 atom_1.coords = atom_2.coords 

152 # Remove the safe bonds frame since it is not required anymore 

153 remove(safe_bonds_frame_filename) 

154 # Set the modified variable as true since we have changes the structure 

155 # Update the structure file using the corrected structure 

156 print(' The structure file has been modified (stable bonds) -> ' + output_structure_file.filename) 

157 structure.generate_pdb_file(output_structure_file.path) 

158 # Set the test as passed 

159 register.update_test(STABLE_BONDS_FLAG, True) 

160 check_stable_bonds() 

161 

162 # Write safe bonds back to the MD 

163 MD.project.get_reference_bonds.prefill(MD.project, safe_bonds, { 

164 'topology_file': input_topology_file, 

165 'structure_file': output_structure_file, 

166 'trajectory_file': input_trajectory_file, 

167 'must_check_stable_bonds': must_check_stable_bonds, 

168 'snapshots': snapshots, 

169 'structure': structure, 

170 'register': register, 

171 'guess_bonds': guess_bonds, 

172 'ignore_bonds': ignore_bonds, 

173 }) 

174 

175 # ------------------------------------------------------------------------------------------ 

176 # Incoherent residue bonds --------------------------------------------------------------- 

177 # ------------------------------------------------------------------------------------------ 

178 

179 # Make sure there are no disconnected groups of atoms in every residue 

180 for residue in structure.residues: 

181 # If the residue is missing bonds then we can not check if it is coherent 

182 # We assume it is and continue 

183 if residue.is_missing_any_bonds(): continue 

184 # If the residue is coherent then continue 

185 if residue.is_coherent(): continue 

186 # Otherwise we report the problem 

187 residue_selection = residue.get_selection() 

188 fragments = list(structure.find_fragments(residue_selection)) 

189 if len(fragments) == 0: raise RuntimeError('Do we have an empty residue?') 

190 if len(fragments) == 1: raise RuntimeError('Test failed but should not') 

191 warn(f'Multiple fragments in residue {residue.index}: {residue}') 

192 for f, fragment in enumerate(fragments, 1): 

193 fragment_atoms = [structure.atoms[index] for index in fragment.atom_indices] 

194 atom_names = [atom.label for atom in fragment_atoms] 

195 print(f' Fragment {f}: {", ".join(atom_names)}') 

196 raise TestFailure(f'Residue {residue.index}: {residue} is not coherent: some atoms are disconnected') 

197 

198 # ------------------------------------------------------------------------------------------ 

199 # Incoherent atom bonds --------------------------------------------------------------- 

200 # ------------------------------------------------------------------------------------------ 

201 

202 # Set if coherent bonds have to be checked 

203 must_check_coherent_bonds = COHERENT_BONDS_FLAG not in trust 

204 if missing_any_bonds: must_check_coherent_bonds = False 

205 

206 # If this analysis has been already passed then we skip the process 

207 if register.tests.get(COHERENT_BONDS_FLAG, None) is True: 

208 must_check_stable_bonds = False 

209 

210 # Run the coherent bonds analysis if necessary 

211 if must_check_coherent_bonds: 

212 # Reset warnings related to this analysis 

213 register.remove_warnings(COHERENT_BONDS_FLAG) 

214 # If the test is not passed then report it 

215 if structure.check_incoherent_bonds(): 

216 print('FAIL: Uncoherent bonds were found') 

217 must_be_killed = COHERENT_BONDS_FLAG not in mercy 

218 if must_be_killed: 

219 raise TestFailure('Failed to find coherent bonds') 

220 register.update_test(COHERENT_BONDS_FLAG, False) 

221 register.add_warning(COHERENT_BONDS_FLAG, 'Some atoms may have a higher or lower number of bonds than they should according to their element.') 

222 # Set the test as succeed if all was good 

223 else: 

224 register.update_test(COHERENT_BONDS_FLAG, True) 

225 

226 # ------------------------------------------------------------------------------------------ 

227 # Missing chains --------------------------------------------------------------------------- 

228 # ------------------------------------------------------------------------------------------ 

229 

230 # Check if chains are missing. If so, create a new chainned structure and set it as the reference 

231 chains = structure.chains 

232 

233 # In case there are not chains at all 

234 if len(chains) == 1 and ( chains[0].name == ' ' or chains[0].name == 'X' ): 

235 warn('Chains are missing and they will be added') 

236 # Stop here if we have bonds guessed from coarse grain (i.e. we have no topology) 

237 # Note that we rely in fragments (and thus in bonds) to guess chains 

238 if structure.is_missing_any_bonds(): 

239 raise InputError('We cannot guess chains with bonds guessed from coarse grain.\n' 

240 ' Please either provide a topology including bonds or set chains in the structure PDB file.') 

241 # Run the chainer 

242 structure.auto_chainer() 

243 # Update the structure file using the corrected structure 

244 print(' The structure file has been modified (no chains) -> ' + output_structure_file.filename) 

245 structure.generate_pdb_file(output_structure_file.path) 

246 

247 else: 

248 # In case there are some missing chains 

249 # Note that atoms with no chain are not a problem for the workflow but they are for the web client 

250 unlettered_chain = next((chain for chain in chains if chain.name == ' '), None) 

251 if unlettered_chain: 

252 current_letters = set([chain.name for chain in chains]) 

253 new_letter = get_new_letter(current_letters) 

254 # If we run out of letters there may be some problematic chain configuration 

255 # In this cases we cannot respect the original chains 

256 if new_letter is None: 

257 warn('No more letters in the alphabet to fill missing chains -> All chains will be assigned from scratch') 

258 # Stop here if we have bonds guessed from coarse grain (i.e. we have no topology) 

259 # Note that we rely in fragments (and thus in bonds) to guess chains 

260 if structure.is_missing_any_bonds(): 

261 raise InputError('We cannot guess chains with bonds guessed from coarse grain.\n' 

262 ' Please either provide a topology including bonds or set chains in the structure PDB file.') 

263 structure.auto_chainer() 

264 else: 

265 warn(f'Some chains are missing -> Unchained regions will be chained as {new_letter}') 

266 unlettered_chain.name = new_letter 

267 # Update the structure file using the corrected structure 

268 print(f' The structure file has been modified (missing chains) -> {output_structure_file.filename}') 

269 structure.generate_pdb_file(output_structure_file.path) 

270 

271 # ------------------------------------------------------------------------------------------ 

272 # Splitted chains -------------------------------------------------------------------------- 

273 # ------------------------------------------------------------------------------------------ 

274 

275 # Note that this will never happen because of the pdb itself, duplicated chains are handled further 

276 # This will only happen if chains were missing and guessed 

277 # This may mean there is something wrong in the structure 

278 # Check fragments with the VMD and searh for wrong bonds 

279 if structure.check_splitted_chains(fix_chains=True, display_summary=True): 

280 # Update the structure file using the corrected structure 

281 print(' The structure file has been modified (splitted chains) -> ' + output_structure_file.filename) 

282 structure.generate_pdb_file(output_structure_file.path) 

283 

284 # ------------------------------------------------------------------------------------------ 

285 # Repeated chains -------------------------------------------------------------------------- 

286 # ------------------------------------------------------------------------------------------ 

287 

288 if structure.check_repeated_chains(fix_chains=True, display_summary=True): 

289 # Update the structure file using the corrected structure 

290 print(' The structure file has been modified (repeated chains) -> ' + output_structure_file.filename) 

291 structure.generate_pdb_file(output_structure_file.path) 

292 

293 # ------------------------------------------------------------------------------------------ 

294 # Coherent chains -------------------------------------------------------------------------- 

295 # ------------------------------------------------------------------------------------------ 

296 

297 # Makes sure polymers with sequence are all bonded 

298 # In other words, make sure there are not multiple proteins/nucleics labeled with the same chain 

299 # Note that their sequences otherwise will be merged silently, thus not reflecting reality 

300 # This would make protein mapping impossible for instance 

301 sequence_polymers_selection = structure.select_protein() + structure.select_nucleic() 

302 

303 # We also want to support having a solution of free aminoacids/nucleotides 

304 # So if residues in the chain are not bonded or bonded in very small fragments we accept it 

305 N_RESIDUES_CUTOFF = 3 

306 

307 # Save if we splitted chains 

308 had_to_split_chains = False 

309 

310 # Iterate sequence polymer chains 

311 for chain in structure.get_selection_chains(sequence_polymers_selection): 

312 # If the chain is missing bonds then we can not check if it is coherent 

313 # We assume it is and continue 

314 if chain.is_missing_any_bonds(): continue 

315 # If there is only one fragment we are good 

316 chain_selection = chain.get_selection() 

317 fragments = list(structure.find_fragments(chain_selection, atom_bonds=safe_bonds)) 

318 if len(fragments) == 0: raise ValueError(f'No fragments found in chain {chain.name}') 

319 if len(fragments) == 1: continue 

320 # We also want to support having a solution of free aminoacids/nucleotides 

321 # So if residues in the chain are not bonded or bonded in very small fragments we accept it 

322 # Make a single fragments out of all small fragments 

323 independent_fragments = [] 

324 merged_fragment = None 

325 for fragment in fragments: 

326 # If the fragments reaches the size cutoff then send it to the independent fragments list 

327 residues = structure.get_selection_residues(fragment) 

328 if len(residues) > N_RESIDUES_CUTOFF: 

329 independent_fragments.append(fragment) 

330 continue 

331 # Otherwise merge it with other small fragments 

332 if merged_fragment: merged_fragment += fragment 

333 else: merged_fragment = fragment 

334 # Add the merged fragment to the list as well 

335 if merged_fragment: 

336 independent_fragments.append(merged_fragment) 

337 # If we have only one fragment after the merging then we are good 

338 if len(independent_fragments) == 1: continue 

339 # Otherwise rename fragments 

340 warn(f'Chain {chain.name} has sequence polymer(s) but not all atoms are bonded.') 

341 # We can skip the first fragment, so it can keep the original chain name 

342 for independent_fragment in independent_fragments[1:]: 

343 print(f' A fragment of this chain including {len(independent_fragment)} atoms will be splitted appart.') 

344 had_to_split_chains = True 

345 # Make a new chain for the current fragment 

346 new_chain_name = structure.get_next_available_chain_name(chain.name) 

347 print(f' This fragment will be renamed as chain "{new_chain_name}"') 

348 structure.set_selection_chain_name(independent_fragment, new_chain_name) 

349 

350 # If we did any change then save the structure 

351 if had_to_split_chains: 

352 print(' The structure file has been modified (coherent chains) -> ' + output_structure_file.filename) 

353 structure.generate_pdb_file(output_structure_file.path) 

354 

355 # ------------------------------------------------------------------------------------------ 

356 # Merged residues ------------------------------------------------------------------------ 

357 # ------------------------------------------------------------------------------------------ 

358 

359 # There may be residues which contain unconnected (unbonded) atoms. They are not allowed. 

360 # They may come from a wrong parsing and be indeed duplicated residues. 

361 # NEVER FORGET: Merged residues may be generated when calling the structure.auto_chainer 

362 # Note that we need bonds to check for merged residues 

363 # If bonds are missing then we skip this check 

364 if not missing_any_bonds and structure.check_merged_residues(fix_residues=True, display_summary=True): 

365 # Update the structure file using the corrected structure 

366 print(' The structure file has been modified (merged residues) -> ' + output_structure_file.filename) 

367 structure.generate_pdb_file(output_structure_file.path) 

368 

369 # ------------------------------------------------------------------------------------------ 

370 # Splitted residues ------------------------------------------------------------------------ 

371 # ------------------------------------------------------------------------------------------ 

372 

373 # Note that we need bonds to check for splitted residues 

374 # If bonds are missing then we skip this check 

375 if not missing_any_bonds and structure.check_repeated_residues(fix_residues=True, display_summary=True): 

376 # Update the structure file using the corrected structure 

377 print(' The structure file has been modified (repeated residues) -> ' + output_structure_file.filename) 

378 structure.generate_pdb_file(output_structure_file.path) 

379 

380 # Sort trajectory coordinates in case atoms were sorted 

381 if input_trajectory_file.path and structure.trajectory_atom_sorter: 

382 # Save a warning in the register 

383 warn('Atoms have been sorted to solve splitted residues') 

384 print('Creating resorted files for atom bonds and charges') 

385 # Bonds are already resorted 

386 save_json(safe_bonds, MD.project.resorted_bonds_file.path, indent=4) 

387 # Charges are to be resorted 

388 # DANI: Esto no se prueba desde hace tiempo y ha sufrido cambios 

389 # DANI: Debería funcionar pero puede tener algun bug 

390 charges = get_charges(input_topology_file) 

391 resorted_charges = [charges[index] for index in structure.new_atom_order] 

392 MD.project.get_charges.prefill(MD.project, resorted_charges, { 

393 'topology_file': input_topology_file 

394 }) 

395 save_json(resorted_charges, MD.project.resorted_charges_file.path, indent=4) 

396 print('Sorting trajectory coordinates to fit the new structure atom sort...') 

397 structure.trajectory_atom_sorter(output_structure_file, input_trajectory_file, output_trajectory_file) 

398 

399 # ------------------------------------------------------------------------------------------ 

400 # Repeated atoms --------------------------------------------------------------------------- 

401 # ------------------------------------------------------------------------------------------ 

402 

403 if structure.check_repeated_atoms(fix_atoms=True, display_summary=True): 

404 # Update the structure file using the corrected structure 

405 print(' The structure file has been modified (repeated atoms) -> ' + output_structure_file.filename) 

406 structure.generate_pdb_file(output_structure_file.path)