Coverage for mddb_workflow / tools / structure_corrector.py: 61%
156 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 18:45 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 18:45 +0000
1from os import remove
2from mddb_workflow.tools.get_bonds import find_safe_bonds, do_bonds_match, get_bonds_reference_frame
3from mddb_workflow.tools.get_bonds import get_excluded_atoms_selection
4from mddb_workflow.tools.get_pdb_frames import get_pdb_frame
5from mddb_workflow.tools.get_charges import get_charges
6from mddb_workflow.utils.auxiliar import InputError, TestFailure, MISSING_BONDS
7from mddb_workflow.utils.auxiliar import get_new_letter, save_json, warn
8from mddb_workflow.utils.constants import CORRECT_ELEMENTS, STABLE_BONDS_FLAG, COHERENT_BONDS_FLAG
9from mddb_workflow.utils.structures import Structure
10from mddb_workflow.utils.type_hints import *
13def structure_corrector(
14 # Note that this is an early provisional structure
15 structure: 'Structure',
16 input_trajectory_file: Optional['File'],
17 input_topology_file: Union['File', Exception],
18 output_structure_file: 'File',
19 output_trajectory_file: Optional['File'],
20 MD: 'MD',
21 # Note that this is an early provisional atom selection
22 pbc_selection: 'Selection',
23 snapshots: int,
24 register: 'Register',
25 mercy: list[str],
26 trust: list[str],
27 guess_bonds: bool,
28 ignore_bonds: bool,
29) -> dict:
30 """Analyze the structure looking for irregularities and then modify the structure to standarize the format.
32 Supported cases:
34 * Missing/Non-standard elements -> Atom elements are guessed when missing and standarized (e.g. ZN -> Zn)
35 * Unstable atom bonds: A bond is formed / broken because its 2 atoms are too close / far in the structure
36 -> Coordinates in the pdb file are replaced by those of the first frame in the trajectory where bonds are stable
37 * Incoherent atom bonds -> If an atom has unexpected bonds then the simulation may be wrong
38 -> Stop the workflow and warn the user
39 * Missing chains -> Chains are added through VMD
40 * Splitted chain -> Kill the process and warn the user. This should never happen by just reading a pdb, but after correcting
41 * Repeated chains -> Chains are renamed (e.g. A, G, B, G, C, G -> A, G, B, H, C, I)
42 * Splitted residues -> Atoms are sorted together by residue. Trajectory coordinates are also sorted
43 * Repeated residues -> Residues are renumerated (e.g. 1, 2, 3, 1, 2, 1, 2 -> 1, 2, 3, 4, 5, 6, 7)
44 * Repeated atoms -> Atoms are renamed with their numeration (e.g. C, C, C, O, O -> C1, C2, C3, O1, O2)
46 Note that the 'mercy' flag may be passed for critical checkings to not kill the process on fail.
47 Note that the 'trust' flag may be passed for critical checkings to skip them.
49 This function also sets some values in the passed MD.
51 """
52 # Write the inital output structure file which will be overwritten several times further
53 print(' The structure file has been copied -> ' + output_structure_file.filename)
54 structure.generate_pdb_file(output_structure_file.path)
56 # ------------------------------------------------------------------------------------------
57 # Missing/Non-standard elements ------------------------------------------------------------
58 # ------------------------------------------------------------------------------------------
60 # It is important to fix elements before trying to fix bonds, since elements have an impact on bonds
61 # VMD logic to find bonds relies in the atom element to set the covalent bond distance cutoff
62 if structure.fix_atom_elements(trust=(CORRECT_ELEMENTS in trust)):
63 # Update the structure file using the corrected structure
64 print(' The structure file has been modified (fix atom elements) -> ' + output_structure_file.filename)
65 structure.generate_pdb_file(output_structure_file.path)
67 # ------------------------------------------------------------------------------------------
68 # Unstable atom bonds ----------------------------------------------------------------------
69 # ------------------------------------------------------------------------------------------
71 # Set if stable bonds have to be checked
72 # Note that we must not skip this even if the test already passed
73 # It may be a corrected structure the one which passed the structure, while this structure comes from the raw input
74 must_check_stable_bonds = STABLE_BONDS_FLAG not in trust
76 # Get safe bonds
77 # Use topology bonds if possible
78 # Otherwise guess bonds by guessing bonds according to coordinates and atom radius for 10 frames along the trajectory
79 safe_bonds = find_safe_bonds(
80 input_topology_file,
81 output_structure_file,
82 input_trajectory_file,
83 must_check_stable_bonds,
84 snapshots,
85 structure,
86 register,
87 guess_bonds,
88 ignore_bonds
89 )
91 # Check if the structure is missing bonds
92 missing_any_bonds = any(bond == MISSING_BONDS for bond in safe_bonds)
93 if missing_any_bonds: must_check_stable_bonds = False
95 # If safe bonds do not match structure bonds then we have to fix it
96 def check_stable_bonds():
97 # Save the current structure bonds to further compare with the safe bonds
98 current_bonds = structure.bonds
99 # Now force safe bonds in the structure even if they "match" already
100 # Note that some bonds are excluded from the check and they may be wrong in the structure
101 # e.g. coarse grain atom bonds
102 structure.bonds = safe_bonds
103 # If we have been requested to skip this test then we are done
104 if not must_check_stable_bonds: return
105 # Reset warnings related to this analysis
106 register.remove_warnings(STABLE_BONDS_FLAG)
107 # Set some atoms which are to be skipped from these test given their "fake" nature
108 excluded_atoms_selection = get_excluded_atoms_selection(structure, pbc_selection)
109 # If bonds match from the begining we are done as well
110 print(f'Checking default structure bonds ({STABLE_BONDS_FLAG})')
111 if do_bonds_match(current_bonds, safe_bonds, excluded_atoms_selection, verbose=True, atoms=structure.atoms):
112 register.update_test(STABLE_BONDS_FLAG, True)
113 print(' They are good')
114 return
115 print(' They are wrong')
116 # Find the first frame in the whole trajectory where safe bonds are respected
117 bonds_reference_frame = get_bonds_reference_frame(
118 structure_file=output_structure_file,
119 trajectory_file=input_trajectory_file,
120 snapshots=snapshots,
121 reference_bonds=safe_bonds,
122 structure=structure,
123 pbc_selection=pbc_selection
124 )
125 # Update the task output so it does not have to be repeated further
126 # IMPORTANT: Note that this is not always run, but only when default structure bonds are wrong
127 # IMPORTANT: Thus it is NORMAL when the reference frame is calculated further in the workflow
128 MD.get_reference_frame.prefill(MD, bonds_reference_frame, {
129 'structure_file': output_structure_file,
130 'trajectory_file': input_trajectory_file,
131 'snapshots': snapshots,
132 'reference_bonds': safe_bonds,
133 'structure': structure,
134 'pbc_selection': pbc_selection
135 })
136 # If there is no reference frame then stop here since there must be a problem
137 if bonds_reference_frame == None:
138 print('There is no reference frame for the safe bonds. Is the trajectory not imaged?')
139 must_be_killed = STABLE_BONDS_FLAG not in mercy
140 if must_be_killed:
141 raise TestFailure('Failed to find stable bonds')
142 register.update_test(STABLE_BONDS_FLAG, False)
143 register.add_warning(STABLE_BONDS_FLAG, ('Could not find a frame in the trajectory respecting all bonds if bonds were guessed according to atom coordinates and radius.\n'
144 'The main PDB structure is a default structure and it would be considered to have wrong bonds if they were predicted as previously stated.'))
145 return
146 # Set also the safe bonds frame structure to mine its coordinates
147 safe_bonds_frame_filename = get_pdb_frame(output_structure_file.path, input_trajectory_file.path, bonds_reference_frame)
148 safe_bonds_frame_structure = Structure.from_pdb_file(safe_bonds_frame_filename)
149 # Set all coordinates in the main structure by copying the safe bonds frame coordinates
150 for atom_1, atom_2 in zip(structure.atoms, safe_bonds_frame_structure.atoms):
151 atom_1.coords = atom_2.coords
152 # Remove the safe bonds frame since it is not required anymore
153 remove(safe_bonds_frame_filename)
154 # Set the modified variable as true since we have changes the structure
155 # Update the structure file using the corrected structure
156 print(' The structure file has been modified (stable bonds) -> ' + output_structure_file.filename)
157 structure.generate_pdb_file(output_structure_file.path)
158 # Set the test as passed
159 register.update_test(STABLE_BONDS_FLAG, True)
160 check_stable_bonds()
162 # Write safe bonds back to the MD
163 MD.project.get_reference_bonds.prefill(MD.project, safe_bonds, {
164 'topology_file': input_topology_file,
165 'structure_file': output_structure_file,
166 'trajectory_file': input_trajectory_file,
167 'must_check_stable_bonds': must_check_stable_bonds,
168 'snapshots': snapshots,
169 'structure': structure,
170 'register': register,
171 'guess_bonds': guess_bonds,
172 'ignore_bonds': ignore_bonds,
173 })
175 # ------------------------------------------------------------------------------------------
176 # Incoherent residue bonds ---------------------------------------------------------------
177 # ------------------------------------------------------------------------------------------
179 # Make sure there are no disconnected groups of atoms in every residue
180 for residue in structure.residues:
181 # If the residue is missing bonds then we can not check if it is coherent
182 # We assume it is and continue
183 if residue.is_missing_any_bonds(): continue
184 # If the residue is coherent then continue
185 if residue.is_coherent(): continue
186 # Otherwise we report the problem
187 residue_selection = residue.get_selection()
188 fragments = list(structure.find_fragments(residue_selection))
189 if len(fragments) == 0: raise RuntimeError('Do we have an empty residue?')
190 if len(fragments) == 1: raise RuntimeError('Test failed but should not')
191 warn(f'Multiple fragments in residue {residue.index}: {residue}')
192 for f, fragment in enumerate(fragments, 1):
193 fragment_atoms = [structure.atoms[index] for index in fragment.atom_indices]
194 atom_names = [atom.label for atom in fragment_atoms]
195 print(f' Fragment {f}: {", ".join(atom_names)}')
196 raise TestFailure(f'Residue {residue.index}: {residue} is not coherent: some atoms are disconnected')
198 # ------------------------------------------------------------------------------------------
199 # Incoherent atom bonds ---------------------------------------------------------------
200 # ------------------------------------------------------------------------------------------
202 # Set if coherent bonds have to be checked
203 must_check_coherent_bonds = COHERENT_BONDS_FLAG not in trust
204 if missing_any_bonds: must_check_coherent_bonds = False
206 # If this analysis has been already passed then we skip the process
207 if register.tests.get(COHERENT_BONDS_FLAG, None) is True:
208 must_check_stable_bonds = False
210 # Run the coherent bonds analysis if necessary
211 if must_check_coherent_bonds:
212 # Reset warnings related to this analysis
213 register.remove_warnings(COHERENT_BONDS_FLAG)
214 # If the test is not passed then report it
215 if structure.check_incoherent_bonds():
216 print('FAIL: Uncoherent bonds were found')
217 must_be_killed = COHERENT_BONDS_FLAG not in mercy
218 if must_be_killed:
219 raise TestFailure('Failed to find coherent bonds')
220 register.update_test(COHERENT_BONDS_FLAG, False)
221 register.add_warning(COHERENT_BONDS_FLAG, 'Some atoms may have a higher or lower number of bonds than they should according to their element.')
222 # Set the test as succeed if all was good
223 else:
224 register.update_test(COHERENT_BONDS_FLAG, True)
226 # ------------------------------------------------------------------------------------------
227 # Missing chains ---------------------------------------------------------------------------
228 # ------------------------------------------------------------------------------------------
230 # Check if chains are missing. If so, create a new chainned structure and set it as the reference
231 chains = structure.chains
233 # In case there are not chains at all
234 if len(chains) == 1 and ( chains[0].name == ' ' or chains[0].name == 'X' ):
235 warn('Chains are missing and they will be added')
236 # Stop here if we have bonds guessed from coarse grain (i.e. we have no topology)
237 # Note that we rely in fragments (and thus in bonds) to guess chains
238 if structure.is_missing_any_bonds():
239 raise InputError('We cannot guess chains with bonds guessed from coarse grain.\n'
240 ' Please either provide a topology including bonds or set chains in the structure PDB file.')
241 # Run the chainer
242 structure.auto_chainer()
243 # Update the structure file using the corrected structure
244 print(' The structure file has been modified (no chains) -> ' + output_structure_file.filename)
245 structure.generate_pdb_file(output_structure_file.path)
247 else:
248 # In case there are some missing chains
249 # Note that atoms with no chain are not a problem for the workflow but they are for the web client
250 unlettered_chain = next((chain for chain in chains if chain.name == ' '), None)
251 if unlettered_chain:
252 current_letters = set([chain.name for chain in chains])
253 new_letter = get_new_letter(current_letters)
254 # If we run out of letters there may be some problematic chain configuration
255 # In this cases we cannot respect the original chains
256 if new_letter is None:
257 warn('No more letters in the alphabet to fill missing chains -> All chains will be assigned from scratch')
258 # Stop here if we have bonds guessed from coarse grain (i.e. we have no topology)
259 # Note that we rely in fragments (and thus in bonds) to guess chains
260 if structure.is_missing_any_bonds():
261 raise InputError('We cannot guess chains with bonds guessed from coarse grain.\n'
262 ' Please either provide a topology including bonds or set chains in the structure PDB file.')
263 structure.auto_chainer()
264 else:
265 warn(f'Some chains are missing -> Unchained regions will be chained as {new_letter}')
266 unlettered_chain.name = new_letter
267 # Update the structure file using the corrected structure
268 print(f' The structure file has been modified (missing chains) -> {output_structure_file.filename}')
269 structure.generate_pdb_file(output_structure_file.path)
271 # ------------------------------------------------------------------------------------------
272 # Splitted chains --------------------------------------------------------------------------
273 # ------------------------------------------------------------------------------------------
275 # Note that this will never happen because of the pdb itself, duplicated chains are handled further
276 # This will only happen if chains were missing and guessed
277 # This may mean there is something wrong in the structure
278 # Check fragments with the VMD and searh for wrong bonds
279 if structure.check_splitted_chains(fix_chains=True, display_summary=True):
280 # Update the structure file using the corrected structure
281 print(' The structure file has been modified (splitted chains) -> ' + output_structure_file.filename)
282 structure.generate_pdb_file(output_structure_file.path)
284 # ------------------------------------------------------------------------------------------
285 # Repeated chains --------------------------------------------------------------------------
286 # ------------------------------------------------------------------------------------------
288 if structure.check_repeated_chains(fix_chains=True, display_summary=True):
289 # Update the structure file using the corrected structure
290 print(' The structure file has been modified (repeated chains) -> ' + output_structure_file.filename)
291 structure.generate_pdb_file(output_structure_file.path)
293 # ------------------------------------------------------------------------------------------
294 # Coherent chains --------------------------------------------------------------------------
295 # ------------------------------------------------------------------------------------------
297 # Makes sure polymers with sequence are all bonded
298 # In other words, make sure there are not multiple proteins/nucleics labeled with the same chain
299 # Note that their sequences otherwise will be merged silently, thus not reflecting reality
300 # This would make protein mapping impossible for instance
301 sequence_polymers_selection = structure.select_protein() + structure.select_nucleic()
303 # We also want to support having a solution of free aminoacids/nucleotides
304 # So if residues in the chain are not bonded or bonded in very small fragments we accept it
305 N_RESIDUES_CUTOFF = 3
307 # Save if we splitted chains
308 had_to_split_chains = False
310 # Iterate sequence polymer chains
311 for chain in structure.get_selection_chains(sequence_polymers_selection):
312 # If the chain is missing bonds then we can not check if it is coherent
313 # We assume it is and continue
314 if chain.is_missing_any_bonds(): continue
315 # If there is only one fragment we are good
316 chain_selection = chain.get_selection()
317 fragments = list(structure.find_fragments(chain_selection, atom_bonds=safe_bonds))
318 if len(fragments) == 0: raise ValueError(f'No fragments found in chain {chain.name}')
319 if len(fragments) == 1: continue
320 # We also want to support having a solution of free aminoacids/nucleotides
321 # So if residues in the chain are not bonded or bonded in very small fragments we accept it
322 # Make a single fragments out of all small fragments
323 independent_fragments = []
324 merged_fragment = None
325 for fragment in fragments:
326 # If the fragments reaches the size cutoff then send it to the independent fragments list
327 residues = structure.get_selection_residues(fragment)
328 if len(residues) > N_RESIDUES_CUTOFF:
329 independent_fragments.append(fragment)
330 continue
331 # Otherwise merge it with other small fragments
332 if merged_fragment: merged_fragment += fragment
333 else: merged_fragment = fragment
334 # Add the merged fragment to the list as well
335 if merged_fragment:
336 independent_fragments.append(merged_fragment)
337 # If we have only one fragment after the merging then we are good
338 if len(independent_fragments) == 1: continue
339 # Otherwise rename fragments
340 warn(f'Chain {chain.name} has sequence polymer(s) but not all atoms are bonded.')
341 # We can skip the first fragment, so it can keep the original chain name
342 for independent_fragment in independent_fragments[1:]:
343 print(f' A fragment of this chain including {len(independent_fragment)} atoms will be splitted appart.')
344 had_to_split_chains = True
345 # Make a new chain for the current fragment
346 new_chain_name = structure.get_next_available_chain_name(chain.name)
347 print(f' This fragment will be renamed as chain "{new_chain_name}"')
348 structure.set_selection_chain_name(independent_fragment, new_chain_name)
350 # If we did any change then save the structure
351 if had_to_split_chains:
352 print(' The structure file has been modified (coherent chains) -> ' + output_structure_file.filename)
353 structure.generate_pdb_file(output_structure_file.path)
355 # ------------------------------------------------------------------------------------------
356 # Merged residues ------------------------------------------------------------------------
357 # ------------------------------------------------------------------------------------------
359 # There may be residues which contain unconnected (unbonded) atoms. They are not allowed.
360 # They may come from a wrong parsing and be indeed duplicated residues.
361 # NEVER FORGET: Merged residues may be generated when calling the structure.auto_chainer
362 # Note that we need bonds to check for merged residues
363 # If bonds are missing then we skip this check
364 if not missing_any_bonds and structure.check_merged_residues(fix_residues=True, display_summary=True):
365 # Update the structure file using the corrected structure
366 print(' The structure file has been modified (merged residues) -> ' + output_structure_file.filename)
367 structure.generate_pdb_file(output_structure_file.path)
369 # ------------------------------------------------------------------------------------------
370 # Splitted residues ------------------------------------------------------------------------
371 # ------------------------------------------------------------------------------------------
373 # Note that we need bonds to check for splitted residues
374 # If bonds are missing then we skip this check
375 if not missing_any_bonds and structure.check_repeated_residues(fix_residues=True, display_summary=True):
376 # Update the structure file using the corrected structure
377 print(' The structure file has been modified (repeated residues) -> ' + output_structure_file.filename)
378 structure.generate_pdb_file(output_structure_file.path)
380 # Sort trajectory coordinates in case atoms were sorted
381 if input_trajectory_file.path and structure.trajectory_atom_sorter:
382 # Save a warning in the register
383 warn('Atoms have been sorted to solve splitted residues')
384 print('Creating resorted files for atom bonds and charges')
385 # Bonds are already resorted
386 save_json(safe_bonds, MD.project.resorted_bonds_file.path, indent=4)
387 # Charges are to be resorted
388 # DANI: Esto no se prueba desde hace tiempo y ha sufrido cambios
389 # DANI: Debería funcionar pero puede tener algun bug
390 charges = get_charges(input_topology_file)
391 resorted_charges = [charges[index] for index in structure.new_atom_order]
392 MD.project.get_charges.prefill(MD.project, resorted_charges, {
393 'topology_file': input_topology_file
394 })
395 save_json(resorted_charges, MD.project.resorted_charges_file.path, indent=4)
396 print('Sorting trajectory coordinates to fit the new structure atom sort...')
397 structure.trajectory_atom_sorter(output_structure_file, input_trajectory_file, output_trajectory_file)
399 # ------------------------------------------------------------------------------------------
400 # Repeated atoms ---------------------------------------------------------------------------
401 # ------------------------------------------------------------------------------------------
403 if structure.check_repeated_atoms(fix_atoms=True, display_summary=True):
404 # Update the structure file using the corrected structure
405 print(' The structure file has been modified (repeated atoms) -> ' + output_structure_file.filename)
406 structure.generate_pdb_file(output_structure_file.path)