Coverage for model_workflow/tools/structure_corrector.py: 51%

139 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 10:54 +0000

1from os import remove 

2 

3# Import local tools 

4from model_workflow.tools.get_bonds import find_safe_bonds, do_bonds_match, get_bonds_canonical_frame 

5from model_workflow.tools.get_bonds import get_excluded_atoms_selection 

6from model_workflow.tools.get_pdb_frames import get_pdb_frame 

7from model_workflow.utils.auxiliar import InputError, TestFailure, MISSING_BONDS 

8from model_workflow.utils.auxiliar import get_new_letter, save_json, warn 

9from model_workflow.utils.constants import CORRECT_ELEMENTS, STABLE_BONDS_FLAG, COHERENT_BONDS_FLAG 

10from model_workflow.utils.structures import Structure 

11from model_workflow.utils.type_hints import * 

12 

13# Analyze the structure looking for irregularities and then modify the structure to standarize the format 

14# 

15# Supported cases: 

16# 

17# * Missing/Non-standard elements -> Atom elements are guessed when missing and standarized (e.g. ZN -> Zn) 

18# * Unstable atom bonds: A bond is formed / broken because its 2 atoms are too close / far in the structure 

19# -> Coordinates in the pdb file are replaced by those of the first frame in the trajectory where bonds are stable 

20# * Incoherent atom bonds -> If an atom has unexpected bonds then the simulation may be wrong 

21# -> Stop the workflow and warn the user 

22# * Missing chains -> Chains are added through VMD 

23# * Splitted chain -> Kill the process and warn the user. This should never happen by just reading a pdb, but after correcting 

24# * Repeated chains -> Chains are renamed (e.g. A, G, B, G, C, G -> A, G, B, H, C, I) 

25# * Splitted residues -> Atoms are sorted together by residue. Trajectory coordinates are also sorted 

26# * Repeated residues -> Residues are renumerated (e.g. 1, 2, 3, 1, 2, 1, 2 -> 1, 2, 3, 4, 5, 6, 7) 

27# * Repeated atoms -> Atoms are renamed with their numeration (e.g. C, C, C, O, O -> C1, C2, C3, O1, O2) 

28 

29# Note that the 'mercy' flag may be passed for critical checkings to not kill the process on fail 

30# Note that the 'trust' flag may be passed for critical checkings to skip them 

31 

32# This function also sets some values in the passed MD 

33 

34def structure_corrector ( 

35 # Note that this is an early provisional structure 

36 structure : 'Structure', 

37 input_trajectory_file : Optional['File'], 

38 input_topology_file : Union['File', Exception], 

39 output_structure_file : 'File', 

40 output_trajectory_file : Optional['File'], 

41 MD : 'MD', 

42 # Note that this is an early provisional atom selection 

43 pbc_selection : 'Selection', 

44 snapshots : int, 

45 register : 'Register', 

46 mercy, 

47 trust 

48) -> dict: 

49 

50 # Write the inital output structure file which will be overwritten several times further 

51 structure.generate_pdb_file(output_structure_file.path) 

52 

53 # ------------------------------------------------------------------------------------------ 

54 # Missing/Non-standard elements ------------------------------------------------------------ 

55 # ------------------------------------------------------------------------------------------ 

56 

57 # It is important to fix elements before trying to fix bonds, since elements have an impact on bonds 

58 # VMD logic to find bonds relies in the atom element to set the covalent bond distance cutoff 

59 if structure.fix_atom_elements(trust=(CORRECT_ELEMENTS in trust)): 

60 # Update the structure file using the corrected structure 

61 print(' The structure file has been modified -> ' + output_structure_file.filename) 

62 structure.generate_pdb_file(output_structure_file.path) 

63 

64 # ------------------------------------------------------------------------------------------ 

65 # Unstable atom bonds ---------------------------------------------------------------------- 

66 # ------------------------------------------------------------------------------------------ 

67 

68 # Set if stable bonds have to be checked 

69 # Note that we must not skip this even if the test already passed 

70 # It may be a corrected structure the one which passed the structure, while this structure comes from the raw input 

71 must_check_stable_bonds = STABLE_BONDS_FLAG not in trust 

72 

73 # Get safe bonds 

74 # Use topology bonds if possible 

75 # Otherwise guess bonds by guessing bonds according to coordinates and atom radius for 10 frames along the trajectory 

76 safe_bonds = find_safe_bonds( 

77 input_topology_file, 

78 output_structure_file, 

79 input_trajectory_file, 

80 must_check_stable_bonds, 

81 snapshots, 

82 structure 

83 ) 

84 # If safe bonds do not match structure bonds then we have to fix it 

85 def check_stable_bonds (): 

86 # Save the current structure bonds to further compare with the safe bonds 

87 current_bonds = structure.bonds 

88 # Now force safe bonds in the structure even if they "match" already 

89 # Note that some bonds are excluded from the check and they may be wrong in the structure 

90 # e.g. coarse grain atom bonds 

91 structure.bonds = safe_bonds 

92 # If we have been requested to skip this test then we are done 

93 if not must_check_stable_bonds: return 

94 # Reset warnings related to this analysis 

95 register.remove_warnings(STABLE_BONDS_FLAG) 

96 # Set some atoms which are to be skipped from these test given their "fake" nature 

97 excluded_atoms_selection = get_excluded_atoms_selection(structure, pbc_selection) 

98 # If bonds match from the begining we are done as well 

99 print('Checking default structure bonds') 

100 if do_bonds_match(current_bonds, safe_bonds, excluded_atoms_selection, verbose=True, atoms=structure.atoms): 

101 register.update_test(STABLE_BONDS_FLAG, True) 

102 print(' They are good') 

103 return 

104 print(' They are wrong') 

105 # Find the first frame in the whole trajectory where safe bonds are respected 

106 safe_bonds_frame = get_bonds_canonical_frame( 

107 structure_file = output_structure_file, 

108 trajectory_file = input_trajectory_file, 

109 snapshots = snapshots, 

110 reference_bonds = safe_bonds, 

111 structure = structure, 

112 pbc_selection = pbc_selection 

113 ) 

114 MD.get_reference_frame._set_parent_output(MD, safe_bonds_frame) 

115 # If there is no canonical frame then stop here since there must be a problem 

116 if safe_bonds_frame == None: 

117 print('There is no canonical frame for safe bonds. Is the trajectory not imaged?') 

118 must_be_killed = STABLE_BONDS_FLAG not in mercy 

119 if must_be_killed: 

120 raise TestFailure('Failed to find stable bonds') 

121 register.update_test(STABLE_BONDS_FLAG, False) 

122 register.add_warning(STABLE_BONDS_FLAG, ('Could not find a frame in the trajectory respecting all bonds if bonds were guessed according to atom coordinates and radius.\n' 

123 'The main PDB structure is a default structure and it would be considered to have wrong bonds if they were predicted as previously stated.')) 

124 return 

125 # Set also the safe bonds frame structure to mine its coordinates 

126 safe_bonds_frame_filename = get_pdb_frame(output_structure_file.path, input_trajectory_file.path, safe_bonds_frame) 

127 safe_bonds_frame_structure = Structure.from_pdb_file(safe_bonds_frame_filename) 

128 # Set all coordinates in the main structure by copying the safe bonds frame coordinates 

129 for atom_1, atom_2 in zip(structure.atoms, safe_bonds_frame_structure.atoms): 

130 atom_1.coords = atom_2.coords 

131 # Remove the safe bonds frame since it is not required anymore 

132 remove(safe_bonds_frame_filename) 

133 # Set the modified variable as true since we have changes the structure 

134 # Update the structure file using the corrected structure 

135 print(' The structure file has been modified -> ' + output_structure_file.filename) 

136 structure.generate_pdb_file(output_structure_file.path) 

137 # Set the test as passed 

138 register.update_test(STABLE_BONDS_FLAG, True) 

139 check_stable_bonds() 

140 

141 # Write safe bonds back to the MD 

142 MD.project.get_reference_bonds._set_parent_output(MD.project, safe_bonds) 

143 

144 # ------------------------------------------------------------------------------------------ 

145 # Incoherent atom bonds --------------------------------------------------------------- 

146 # ------------------------------------------------------------------------------------------ 

147 

148 # Set if coherent bonds have to be checked 

149 must_check_coherent_bonds = COHERENT_BONDS_FLAG not in trust 

150 

151 # If this analysis has been already passed then we skip the process 

152 if register.tests.get(COHERENT_BONDS_FLAG, None) == True: 

153 must_check_stable_bonds = False 

154 

155 # Run the coherent bonds analysis if necessary 

156 if must_check_coherent_bonds: 

157 # Reset warnings related to this analysis 

158 register.remove_warnings(COHERENT_BONDS_FLAG) 

159 # If the test is not passed then report it 

160 if structure.check_incoherent_bonds(): 

161 print('FAIL: Uncoherent bonds were found') 

162 must_be_killed = COHERENT_BONDS_FLAG not in mercy 

163 if must_be_killed: 

164 raise TestFailure('Failed to find coherent bonds') 

165 register.update_test(COHERENT_BONDS_FLAG, False) 

166 register.add_warning(COHERENT_BONDS_FLAG, 'Some atoms may have a higher or lower number of bonds than they should according to their element.') 

167 # Set the test as succeed if all was good 

168 else: 

169 register.update_test(COHERENT_BONDS_FLAG, True) 

170 

171 # ------------------------------------------------------------------------------------------ 

172 # Missing chains --------------------------------------------------------------------------- 

173 # ------------------------------------------------------------------------------------------ 

174 

175 # Check if chains are missing. If so, create a new chainned structure and set it as the reference 

176 chains = structure.chains 

177 

178 # In case there are not chains at all 

179 if len(chains) == 1 and ( chains[0].name == ' ' or chains[0].name == 'X' ): 

180 print('WARNING: chains are missing and they will be added') 

181 # Stop here if we have bonds guessed from coarse grain (i.e. we have no topology) 

182 # Note that we rely in fragments (and thus in bonds) to guess chains 

183 if next((True for bonds in structure.bonds if bonds == MISSING_BONDS), False): 

184 raise InputError('We cannot guess chains with bonds guessed from coarse grain.\n' 

185 ' Please either provide a topology or set chains in the structure PDB file.') 

186 # Run the chainer 

187 structure.auto_chainer() 

188 # Update the structure file using the corrected structure 

189 print(' The structure file has been modified -> ' + output_structure_file.filename) 

190 structure.generate_pdb_file(output_structure_file.path) 

191 

192 else: 

193 # In case there are some missing chains 

194 # Note that atoms with no chain are not a problem for the workflow but they are for the web client 

195 unlettered_chain = next((chain for chain in chains if chain.name == ' '), None) 

196 if unlettered_chain: 

197 current_letters = set([ chain.name for chain in chains ]) 

198 new_letter = get_new_letter(current_letters) 

199 # If we run out of letters there may be some problematic chain configuration 

200 # In this cases we cannot respect the original chains 

201 if new_letter == None: 

202 warn('No more letters in the alphabel to fill missing chains -> All chains will be assigned from scratch') 

203 structure.auto_chainer() 

204 else: 

205 warn(f'Some chains are missing -> Unchained regions will be chained as {new_letter}') 

206 unlettered_chain.name = new_letter 

207 # Update the structure file using the corrected structure 

208 print(f' The structure file has been modified -> {output_structure_file.filename}') 

209 structure.generate_pdb_file(output_structure_file.path) 

210 

211 # ------------------------------------------------------------------------------------------ 

212 # Splitted chains -------------------------------------------------------------------------- 

213 # ------------------------------------------------------------------------------------------ 

214 

215 # Check if chains are splitted. If so, stop here and warn the user 

216 checked_chains = [] 

217 last_chain = None 

218 for atom in structure.atoms: 

219 chain_name = atom.chain.name 

220 if chain_name == last_chain: 

221 continue 

222 last_chain = chain_name 

223 if chain_name in checked_chains: 

224 # Note that this will never happen because of the pdb itself, duplicated chains are handled further 

225 # This will only happen if chains were missing and guessed, and there is something very wrong in the structure 

226 # Check fragments with the VMD and searh for wrong bonds 

227 raise TestFailure(f'We are having splitted chains (e.g. {chain_name})') 

228 checked_chains.append(chain_name) 

229 

230 # ------------------------------------------------------------------------------------------ 

231 # Repeated chains -------------------------------------------------------------------------- 

232 # ------------------------------------------------------------------------------------------ 

233 

234 if structure.check_repeated_chains(fix_chains=True, display_summary=True): 

235 # Update the structure file using the corrected structure 

236 print(' The structure file has been modified -> ' + output_structure_file.filename) 

237 structure.generate_pdb_file(output_structure_file.path) 

238 

239 # ------------------------------------------------------------------------------------------ 

240 # Coherent chains -------------------------------------------------------------------------- 

241 # ------------------------------------------------------------------------------------------ 

242 

243 # Makes sure polymers with sequence are all bonded 

244 # In other words, make sure there are not multiple proteins/nucleics labeled with the same chain 

245 # Note that their sequences otherwise will be merged silently, thus not reflecting reality 

246 # This would make protein mapping impossible for instance 

247 sequence_polymers_selection = structure.select_protein() + structure.select_nucleic() 

248 

249 # We also want to support having a solution of free aminoacids/nucleotides 

250 # So if residues in the chain are not bonded or bonded in very small fragments we accept it 

251 N_RESIDUES_CUTOFF = 3 

252 

253 # Save if we splitted chains 

254 had_to_split_chains = False 

255 

256 # Iterate sequence polymer chains 

257 for chain in structure.get_selection_chains(sequence_polymers_selection): 

258 # If there is only one fragment we are good 

259 chain_selection = chain.get_selection() 

260 fragments = list(structure.find_fragments(chain_selection)) 

261 if len(fragments) == 0: raise ValueError(f'No fragments found in chain {chain.name}') 

262 if len(fragments) == 1: continue 

263 # We also want to support having a solution of free aminoacids/nucleotides 

264 # So if residues in the chain are not bonded or bonded in very small fragments we accept it 

265 # Make a single fragments out of all small fragments 

266 independent_fragments = [] 

267 merged_fragment = None 

268 for fragment in fragments: 

269 # If the fragments reaches the size cutoff then send it to the independent fragments list 

270 residues = structure.get_selection_residues(fragment) 

271 if len(residues) > N_RESIDUES_CUTOFF: 

272 independent_fragments.append(fragment) 

273 continue 

274 # Otherwise merge it with other small fragments 

275 if merged_fragment: merged_fragment += fragment 

276 else: merged_fragment = fragment 

277 # Add the merged fragment to the list as well 

278 if merged_fragment: 

279 independent_fragments.append(merged_fragment) 

280 # If we have only one fragment after the merging then we are good 

281 if len(independent_fragments) == 1: continue 

282 # Otherwise rename fragments 

283 warn(f'Chain {chain.name} has sequence polymer(s) but not all atoms are bonded.') 

284 # We can skip the first fragment, so it can keep the original chain name 

285 for independent_fragment in independent_fragments[1:]: 

286 print(f' A fragment of this chain including {len(independent_fragment)} atoms will be splitted appart.') 

287 had_to_split_chains = True 

288 # Make a new chain for the current fragment 

289 new_chain_name = structure.get_next_available_chain_name(chain.name) 

290 print(f' This fragment will be renamed as chain "{new_chain_name}"') 

291 structure.set_selection_chain_name(independent_fragment, new_chain_name) 

292 

293 # If we did any change then save the structure 

294 if had_to_split_chains: 

295 print(' The structure file has been modified -> ' + output_structure_file.filename) 

296 structure.generate_pdb_file(output_structure_file.path) 

297 

298 # ------------------------------------------------------------------------------------------ 

299 # Splitted residues ------------------------------------------------------------------------ 

300 # ------------------------------------------------------------------------------------------ 

301 

302 if structure.check_repeated_residues(fix_residues=True, display_summary=True): 

303 # Update the structure file using the corrected structure 

304 print(' The structure file has been modified -> ' + output_structure_file.filename) 

305 structure.generate_pdb_file(output_structure_file.path) 

306 

307 # Sort trajectory coordinates in case atoms were sorted 

308 if input_trajectory_file.path and structure.trajectory_atom_sorter: 

309 # Save a warning in the register 

310 warn('Atoms have been sorted to solve splitted residues') 

311 print('Creating resorted files for atom bonds and charges') 

312 # Bonds are already resorted 

313 save_json(safe_bonds, MD.project.resorted_bonds_file.path, indent=4) 

314 # Charges are to be resorted 

315 charges = MD.project.get_charges._get_parent_output(MD.project) 

316 resorted_charges = [ charges[index] for index in structure.new_atom_order ] 

317 MD.project.get_charges._set_parent_output(MD.project, resorted_charges) 

318 save_json(resorted_charges, MD.project.resorted_charges_file.path, indent=4) 

319 print('Sorting trajectory coordinates to fit the new structure atom sort...') 

320 structure.trajectory_atom_sorter(output_structure_file, input_trajectory_file, output_trajectory_file) 

321 

322 # ------------------------------------------------------------------------------------------ 

323 # Repeated atoms --------------------------------------------------------------------------- 

324 # ------------------------------------------------------------------------------------------ 

325 

326 if structure.check_repeated_atoms(fix_atoms=True, display_summary=True): 

327 # Update the structure file using the corrected structure 

328 print(' The structure file has been modified -> ' + output_structure_file.filename) 

329 structure.generate_pdb_file(output_structure_file.path)