Coverage for mddb_workflow/tools/structure_corrector.py: 49%
151 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 15:48 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 15:48 +0000
1from os import remove
2from mddb_workflow.tools.get_bonds import find_safe_bonds, do_bonds_match, get_bonds_reference_frame
3from mddb_workflow.tools.get_bonds import get_excluded_atoms_selection
4from mddb_workflow.tools.get_pdb_frames import get_pdb_frame
5from mddb_workflow.utils.auxiliar import InputError, TestFailure
6from mddb_workflow.utils.auxiliar import get_new_letter, save_json, warn
7from mddb_workflow.utils.constants import CORRECT_ELEMENTS, STABLE_BONDS_FLAG, COHERENT_BONDS_FLAG
8from mddb_workflow.utils.structures import Structure
9from mddb_workflow.utils.type_hints import *
12def structure_corrector (
13 # Note that this is an early provisional structure
14 structure : 'Structure',
15 input_trajectory_file : Optional['File'],
16 input_topology_file : Union['File', Exception],
17 output_structure_file : 'File',
18 output_trajectory_file : Optional['File'],
19 MD : 'MD',
20 # Note that this is an early provisional atom selection
21 pbc_selection : 'Selection',
22 snapshots : int,
23 register : 'Register',
24 mercy : list[str],
25 trust : list[str],
26 guess_bonds : bool
27) -> dict:
28 """
29 Analyze the structure looking for irregularities and then modify the structure to standarize the format.
31 Supported cases:
33 * Missing/Non-standard elements -> Atom elements are guessed when missing and standarized (e.g. ZN -> Zn)
34 * Unstable atom bonds: A bond is formed / broken because its 2 atoms are too close / far in the structure
35 -> Coordinates in the pdb file are replaced by those of the first frame in the trajectory where bonds are stable
36 * Incoherent atom bonds -> If an atom has unexpected bonds then the simulation may be wrong
37 -> Stop the workflow and warn the user
38 * Missing chains -> Chains are added through VMD
39 * Splitted chain -> Kill the process and warn the user. This should never happen by just reading a pdb, but after correcting
40 * Repeated chains -> Chains are renamed (e.g. A, G, B, G, C, G -> A, G, B, H, C, I)
41 * Splitted residues -> Atoms are sorted together by residue. Trajectory coordinates are also sorted
42 * Repeated residues -> Residues are renumerated (e.g. 1, 2, 3, 1, 2, 1, 2 -> 1, 2, 3, 4, 5, 6, 7)
43 * Repeated atoms -> Atoms are renamed with their numeration (e.g. C, C, C, O, O -> C1, C2, C3, O1, O2)
45 Note that the 'mercy' flag may be passed for critical checkings to not kill the process on fail.
46 Note that the 'trust' flag may be passed for critical checkings to skip them.
48 This function also sets some values in the passed MD.
49 """
51 # Write the inital output structure file which will be overwritten several times further
52 print(' The structure file has been copied -> ' + output_structure_file.filename)
53 structure.generate_pdb_file(output_structure_file.path)
55 # ------------------------------------------------------------------------------------------
56 # Missing/Non-standard elements ------------------------------------------------------------
57 # ------------------------------------------------------------------------------------------
59 # It is important to fix elements before trying to fix bonds, since elements have an impact on bonds
60 # VMD logic to find bonds relies in the atom element to set the covalent bond distance cutoff
61 if structure.fix_atom_elements(trust=(CORRECT_ELEMENTS in trust)):
62 # Update the structure file using the corrected structure
63 print(' The structure file has been modified (fix atom elements) -> ' + output_structure_file.filename)
64 structure.generate_pdb_file(output_structure_file.path)
66 # ------------------------------------------------------------------------------------------
67 # Unstable atom bonds ----------------------------------------------------------------------
68 # ------------------------------------------------------------------------------------------
70 # Set if stable bonds have to be checked
71 # Note that we must not skip this even if the test already passed
72 # It may be a corrected structure the one which passed the structure, while this structure comes from the raw input
73 must_check_stable_bonds = STABLE_BONDS_FLAG not in trust
75 # Get safe bonds
76 # Use topology bonds if possible
77 # Otherwise guess bonds by guessing bonds according to coordinates and atom radius for 10 frames along the trajectory
78 safe_bonds = find_safe_bonds(
79 input_topology_file,
80 output_structure_file,
81 input_trajectory_file,
82 must_check_stable_bonds,
83 snapshots,
84 structure,
85 guess_bonds
86 )
87 # If safe bonds do not match structure bonds then we have to fix it
88 def check_stable_bonds ():
89 # Save the current structure bonds to further compare with the safe bonds
90 current_bonds = structure.bonds
91 # Now force safe bonds in the structure even if they "match" already
92 # Note that some bonds are excluded from the check and they may be wrong in the structure
93 # e.g. coarse grain atom bonds
94 structure.bonds = safe_bonds
95 # If we have been requested to skip this test then we are done
96 if not must_check_stable_bonds: return
97 # Reset warnings related to this analysis
98 register.remove_warnings(STABLE_BONDS_FLAG)
99 # Set some atoms which are to be skipped from these test given their "fake" nature
100 excluded_atoms_selection = get_excluded_atoms_selection(structure, pbc_selection)
101 # If bonds match from the begining we are done as well
102 print(f'Checking default structure bonds ({STABLE_BONDS_FLAG})')
103 if do_bonds_match(current_bonds, safe_bonds, excluded_atoms_selection, verbose=True, atoms=structure.atoms):
104 register.update_test(STABLE_BONDS_FLAG, True)
105 print(' They are good')
106 return
107 print(' They are wrong')
108 # Find the first frame in the whole trajectory where safe bonds are respected
109 bonds_reference_frame = get_bonds_reference_frame(
110 structure_file = output_structure_file,
111 trajectory_file = input_trajectory_file,
112 snapshots = snapshots,
113 reference_bonds = safe_bonds,
114 structure = structure,
115 pbc_selection = pbc_selection
116 )
117 MD.get_reference_frame._set_parent_output(MD, bonds_reference_frame)
118 # If there is no reference frame then stop here since there must be a problem
119 if bonds_reference_frame == None:
120 print('There is no reference frame for the safe bonds. Is the trajectory not imaged?')
121 must_be_killed = STABLE_BONDS_FLAG not in mercy
122 if must_be_killed:
123 raise TestFailure('Failed to find stable bonds')
124 register.update_test(STABLE_BONDS_FLAG, False)
125 register.add_warning(STABLE_BONDS_FLAG, ('Could not find a frame in the trajectory respecting all bonds if bonds were guessed according to atom coordinates and radius.\n'
126 'The main PDB structure is a default structure and it would be considered to have wrong bonds if they were predicted as previously stated.'))
127 return
128 # Set also the safe bonds frame structure to mine its coordinates
129 safe_bonds_frame_filename = get_pdb_frame(output_structure_file.path, input_trajectory_file.path, bonds_reference_frame)
130 safe_bonds_frame_structure = Structure.from_pdb_file(safe_bonds_frame_filename)
131 # Set all coordinates in the main structure by copying the safe bonds frame coordinates
132 for atom_1, atom_2 in zip(structure.atoms, safe_bonds_frame_structure.atoms):
133 atom_1.coords = atom_2.coords
134 # Remove the safe bonds frame since it is not required anymore
135 remove(safe_bonds_frame_filename)
136 # Set the modified variable as true since we have changes the structure
137 # Update the structure file using the corrected structure
138 print(' The structure file has been modified (stable bonds) -> ' + output_structure_file.filename)
139 structure.generate_pdb_file(output_structure_file.path)
140 # Set the test as passed
141 register.update_test(STABLE_BONDS_FLAG, True)
142 check_stable_bonds()
144 # Write safe bonds back to the MD
145 MD.project.get_reference_bonds._set_parent_output(MD.project, safe_bonds)
147 # ------------------------------------------------------------------------------------------
148 # Incoherent residue bonds ---------------------------------------------------------------
149 # ------------------------------------------------------------------------------------------
151 # Make sure there are no disconnected groups of atoms in every residue
152 for residue in structure.residues:
153 # If the residue is missing bonds then we can not check if it is coherent
154 # We assume it is and continue
155 if residue.is_missing_any_bonds(): continue
156 # If the residue is coherent then continue
157 if residue.is_coherent(): continue
158 # Otherwise we report the problem
159 residue_selection = residue.get_selection()
160 fragments = list(structure.find_fragments(residue_selection))
161 if len(fragments) == 0: raise RuntimeError('Do we have an empty residue?')
162 if len(fragments) == 1: raise RuntimeError('Test failed but should not')
163 warn(f'Multiple fragments in residue {residue.index}: {residue}')
164 for f, fragment in enumerate(fragments, 1):
165 fragment_atoms = [ structure.atoms[index] for index in fragment.atom_indices ]
166 atom_names = [ atom.label for atom in fragment_atoms ]
167 print(f' Fragment {f}: {", ".join(atom_names)}')
168 raise TestFailure(f'Residue {residue.index}: {residue} is not coherent: some atoms are disconnected')
170 # ------------------------------------------------------------------------------------------
171 # Incoherent atom bonds ---------------------------------------------------------------
172 # ------------------------------------------------------------------------------------------
174 # Set if coherent bonds have to be checked
175 must_check_coherent_bonds = COHERENT_BONDS_FLAG not in trust
177 # If this analysis has been already passed then we skip the process
178 if register.tests.get(COHERENT_BONDS_FLAG, None) == True:
179 must_check_stable_bonds = False
181 # Run the coherent bonds analysis if necessary
182 if must_check_coherent_bonds:
183 # Reset warnings related to this analysis
184 register.remove_warnings(COHERENT_BONDS_FLAG)
185 # If the test is not passed then report it
186 if structure.check_incoherent_bonds():
187 print('FAIL: Uncoherent bonds were found')
188 must_be_killed = COHERENT_BONDS_FLAG not in mercy
189 if must_be_killed:
190 raise TestFailure('Failed to find coherent bonds')
191 register.update_test(COHERENT_BONDS_FLAG, False)
192 register.add_warning(COHERENT_BONDS_FLAG, 'Some atoms may have a higher or lower number of bonds than they should according to their element.')
193 # Set the test as succeed if all was good
194 else:
195 register.update_test(COHERENT_BONDS_FLAG, True)
197 # ------------------------------------------------------------------------------------------
198 # Missing chains ---------------------------------------------------------------------------
199 # ------------------------------------------------------------------------------------------
201 # Check if chains are missing. If so, create a new chainned structure and set it as the reference
202 chains = structure.chains
204 # In case there are not chains at all
205 if len(chains) == 1 and ( chains[0].name == ' ' or chains[0].name == 'X' ):
206 warn('Chains are missing and they will be added')
207 # Stop here if we have bonds guessed from coarse grain (i.e. we have no topology)
208 # Note that we rely in fragments (and thus in bonds) to guess chains
209 if structure.is_missing_any_bonds():
210 raise InputError('We cannot guess chains with bonds guessed from coarse grain.\n'
211 ' Please either provide a topology including bonds or set chains in the structure PDB file.')
212 # Run the chainer
213 structure.auto_chainer()
214 # Update the structure file using the corrected structure
215 print(' The structure file has been modified (no chains) -> ' + output_structure_file.filename)
216 structure.generate_pdb_file(output_structure_file.path)
218 else:
219 # In case there are some missing chains
220 # Note that atoms with no chain are not a problem for the workflow but they are for the web client
221 unlettered_chain = next((chain for chain in chains if chain.name == ' '), None)
222 if unlettered_chain:
223 current_letters = set([ chain.name for chain in chains ])
224 new_letter = get_new_letter(current_letters)
225 # If we run out of letters there may be some problematic chain configuration
226 # In this cases we cannot respect the original chains
227 if new_letter == None:
228 warn('No more letters in the alphabel to fill missing chains -> All chains will be assigned from scratch')
229 structure.auto_chainer()
230 else:
231 warn(f'Some chains are missing -> Unchained regions will be chained as {new_letter}')
232 unlettered_chain.name = new_letter
233 # Update the structure file using the corrected structure
234 print(f' The structure file has been modified (missing chains) -> {output_structure_file.filename}')
235 structure.generate_pdb_file(output_structure_file.path)
237 # ------------------------------------------------------------------------------------------
238 # Splitted chains --------------------------------------------------------------------------
239 # ------------------------------------------------------------------------------------------
241 # Note that this will never happen because of the pdb itself, duplicated chains are handled further
242 # This will only happen if chains were missing and guessed
243 # This may mean there is something wrong in the structure
244 # Check fragments with the VMD and searh for wrong bonds
245 if structure.check_splitted_chains(fix_chains = True, display_summary = True):
246 # Update the structure file using the corrected structure
247 print(' The structure file has been modified (splitted chains) -> ' + output_structure_file.filename)
248 structure.generate_pdb_file(output_structure_file.path)
250 # ------------------------------------------------------------------------------------------
251 # Repeated chains --------------------------------------------------------------------------
252 # ------------------------------------------------------------------------------------------
254 if structure.check_repeated_chains(fix_chains=True, display_summary=True):
255 # Update the structure file using the corrected structure
256 print(' The structure file has been modified (repeated chains) -> ' + output_structure_file.filename)
257 structure.generate_pdb_file(output_structure_file.path)
259 # ------------------------------------------------------------------------------------------
260 # Coherent chains --------------------------------------------------------------------------
261 # ------------------------------------------------------------------------------------------
263 # Makes sure polymers with sequence are all bonded
264 # In other words, make sure there are not multiple proteins/nucleics labeled with the same chain
265 # Note that their sequences otherwise will be merged silently, thus not reflecting reality
266 # This would make protein mapping impossible for instance
267 sequence_polymers_selection = structure.select_protein() + structure.select_nucleic()
269 # We also want to support having a solution of free aminoacids/nucleotides
270 # So if residues in the chain are not bonded or bonded in very small fragments we accept it
271 N_RESIDUES_CUTOFF = 3
273 # Save if we splitted chains
274 had_to_split_chains = False
276 # Iterate sequence polymer chains
277 for chain in structure.get_selection_chains(sequence_polymers_selection):
278 # If the chain is missing bonds then we can not check if it is coherent
279 # We assume it is and continue
280 if chain.is_missing_any_bonds(): continue
281 # If there is only one fragment we are good
282 chain_selection = chain.get_selection()
283 fragments = list(structure.find_fragments(chain_selection, atom_bonds=safe_bonds))
284 if len(fragments) == 0: raise ValueError(f'No fragments found in chain {chain.name}')
285 if len(fragments) == 1: continue
286 # We also want to support having a solution of free aminoacids/nucleotides
287 # So if residues in the chain are not bonded or bonded in very small fragments we accept it
288 # Make a single fragments out of all small fragments
289 independent_fragments = []
290 merged_fragment = None
291 for fragment in fragments:
292 # If the fragments reaches the size cutoff then send it to the independent fragments list
293 residues = structure.get_selection_residues(fragment)
294 if len(residues) > N_RESIDUES_CUTOFF:
295 independent_fragments.append(fragment)
296 continue
297 # Otherwise merge it with other small fragments
298 if merged_fragment: merged_fragment += fragment
299 else: merged_fragment = fragment
300 # Add the merged fragment to the list as well
301 if merged_fragment:
302 independent_fragments.append(merged_fragment)
303 # If we have only one fragment after the merging then we are good
304 if len(independent_fragments) == 1: continue
305 # Otherwise rename fragments
306 warn(f'Chain {chain.name} has sequence polymer(s) but not all atoms are bonded.')
307 # We can skip the first fragment, so it can keep the original chain name
308 for independent_fragment in independent_fragments[1:]:
309 print(f' A fragment of this chain including {len(independent_fragment)} atoms will be splitted appart.')
310 had_to_split_chains = True
311 # Make a new chain for the current fragment
312 new_chain_name = structure.get_next_available_chain_name(chain.name)
313 print(f' This fragment will be renamed as chain "{new_chain_name}"')
314 structure.set_selection_chain_name(independent_fragment, new_chain_name)
316 # If we did any change then save the structure
317 if had_to_split_chains:
318 print(' The structure file has been modified (coherent chains) -> ' + output_structure_file.filename)
319 structure.generate_pdb_file(output_structure_file.path)
321 # ------------------------------------------------------------------------------------------
322 # Merged residues ------------------------------------------------------------------------
323 # ------------------------------------------------------------------------------------------
325 # Check if the structure is missing bonds
326 missing_any_bonds = structure.is_missing_any_bonds()
328 # There may be residues which contain unconnected (unbonded) atoms. They are not allowed.
329 # They may come from a wrong parsing and be indeed duplicated residues.
330 # NEVER FORGET: Merged residues may be generated when calling the structure.auto_chainer
331 # Note that we need bonds to check for merged residues
332 # If bonds are missing then we skip this check
333 if not missing_any_bonds and structure.check_merged_residues(fix_residues = True, display_summary = True):
334 # Update the structure file using the corrected structure
335 print(' The structure file has been modified (merged residues) -> ' + output_structure_file.filename)
336 structure.generate_pdb_file(output_structure_file.path)
338 # ------------------------------------------------------------------------------------------
339 # Splitted residues ------------------------------------------------------------------------
340 # ------------------------------------------------------------------------------------------
342 # Note that we need bonds to check for splitted residues
343 # If bonds are missing then we skip this check
344 if not missing_any_bonds and structure.check_repeated_residues(fix_residues=True, display_summary=True):
345 # Update the structure file using the corrected structure
346 print(' The structure file has been modified (repeated residues) -> ' + output_structure_file.filename)
347 structure.generate_pdb_file(output_structure_file.path)
349 # Sort trajectory coordinates in case atoms were sorted
350 if input_trajectory_file.path and structure.trajectory_atom_sorter:
351 # Save a warning in the register
352 warn('Atoms have been sorted to solve splitted residues')
353 print('Creating resorted files for atom bonds and charges')
354 # Bonds are already resorted
355 save_json(safe_bonds, MD.project.resorted_bonds_file.path, indent=4)
356 # Charges are to be resorted
357 charges = MD.project.get_charges._get_parent_output(MD.project)
358 resorted_charges = [ charges[index] for index in structure.new_atom_order ]
359 MD.project.get_charges._set_parent_output(MD.project, resorted_charges)
360 save_json(resorted_charges, MD.project.resorted_charges_file.path, indent=4)
361 print('Sorting trajectory coordinates to fit the new structure atom sort...')
362 structure.trajectory_atom_sorter(output_structure_file, input_trajectory_file, output_trajectory_file)
364 # ------------------------------------------------------------------------------------------
365 # Repeated atoms ---------------------------------------------------------------------------
366 # ------------------------------------------------------------------------------------------
368 if structure.check_repeated_atoms(fix_atoms=True, display_summary=True):
369 # Update the structure file using the corrected structure
370 print(' The structure file has been modified (repeated atoms) -> ' + output_structure_file.filename)
371 structure.generate_pdb_file(output_structure_file.path)