Coverage for model_workflow/tools/structure_corrector.py: 51%
139 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
1from os import remove
3# Import local tools
4from model_workflow.tools.get_bonds import find_safe_bonds, do_bonds_match, get_bonds_canonical_frame
5from model_workflow.tools.get_bonds import get_excluded_atoms_selection
6from model_workflow.tools.get_pdb_frames import get_pdb_frame
7from model_workflow.utils.auxiliar import InputError, TestFailure, MISSING_BONDS
8from model_workflow.utils.auxiliar import get_new_letter, save_json, warn
9from model_workflow.utils.constants import CORRECT_ELEMENTS, STABLE_BONDS_FLAG, COHERENT_BONDS_FLAG
10from model_workflow.utils.structures import Structure
11from model_workflow.utils.type_hints import *
13# Analyze the structure looking for irregularities and then modify the structure to standarize the format
14#
15# Supported cases:
16#
17# * Missing/Non-standard elements -> Atom elements are guessed when missing and standarized (e.g. ZN -> Zn)
18# * Unstable atom bonds: A bond is formed / broken because its 2 atoms are too close / far in the structure
19# -> Coordinates in the pdb file are replaced by those of the first frame in the trajectory where bonds are stable
20# * Incoherent atom bonds -> If an atom has unexpected bonds then the simulation may be wrong
21# -> Stop the workflow and warn the user
22# * Missing chains -> Chains are added through VMD
23# * Splitted chain -> Kill the process and warn the user. This should never happen by just reading a pdb, but after correcting
24# * Repeated chains -> Chains are renamed (e.g. A, G, B, G, C, G -> A, G, B, H, C, I)
25# * Splitted residues -> Atoms are sorted together by residue. Trajectory coordinates are also sorted
26# * Repeated residues -> Residues are renumerated (e.g. 1, 2, 3, 1, 2, 1, 2 -> 1, 2, 3, 4, 5, 6, 7)
27# * Repeated atoms -> Atoms are renamed with their numeration (e.g. C, C, C, O, O -> C1, C2, C3, O1, O2)
29# Note that the 'mercy' flag may be passed for critical checkings to not kill the process on fail
30# Note that the 'trust' flag may be passed for critical checkings to skip them
32# This function also sets some values in the passed MD
34def structure_corrector (
35 # Note that this is an early provisional structure
36 structure : 'Structure',
37 input_trajectory_file : Optional['File'],
38 input_topology_file : Union['File', Exception],
39 output_structure_file : 'File',
40 output_trajectory_file : Optional['File'],
41 MD : 'MD',
42 # Note that this is an early provisional atom selection
43 pbc_selection : 'Selection',
44 snapshots : int,
45 register : 'Register',
46 mercy,
47 trust
48) -> dict:
50 # Write the inital output structure file which will be overwritten several times further
51 structure.generate_pdb_file(output_structure_file.path)
53 # ------------------------------------------------------------------------------------------
54 # Missing/Non-standard elements ------------------------------------------------------------
55 # ------------------------------------------------------------------------------------------
57 # It is important to fix elements before trying to fix bonds, since elements have an impact on bonds
58 # VMD logic to find bonds relies in the atom element to set the covalent bond distance cutoff
59 if structure.fix_atom_elements(trust=(CORRECT_ELEMENTS in trust)):
60 # Update the structure file using the corrected structure
61 print(' The structure file has been modified -> ' + output_structure_file.filename)
62 structure.generate_pdb_file(output_structure_file.path)
64 # ------------------------------------------------------------------------------------------
65 # Unstable atom bonds ----------------------------------------------------------------------
66 # ------------------------------------------------------------------------------------------
68 # Set if stable bonds have to be checked
69 # Note that we must not skip this even if the test already passed
70 # It may be a corrected structure the one which passed the structure, while this structure comes from the raw input
71 must_check_stable_bonds = STABLE_BONDS_FLAG not in trust
73 # Get safe bonds
74 # Use topology bonds if possible
75 # Otherwise guess bonds by guessing bonds according to coordinates and atom radius for 10 frames along the trajectory
76 safe_bonds = find_safe_bonds(
77 input_topology_file,
78 output_structure_file,
79 input_trajectory_file,
80 must_check_stable_bonds,
81 snapshots,
82 structure
83 )
84 # If safe bonds do not match structure bonds then we have to fix it
85 def check_stable_bonds ():
86 # Save the current structure bonds to further compare with the safe bonds
87 current_bonds = structure.bonds
88 # Now force safe bonds in the structure even if they "match" already
89 # Note that some bonds are excluded from the check and they may be wrong in the structure
90 # e.g. coarse grain atom bonds
91 structure.bonds = safe_bonds
92 # If we have been requested to skip this test then we are done
93 if not must_check_stable_bonds: return
94 # Reset warnings related to this analysis
95 register.remove_warnings(STABLE_BONDS_FLAG)
96 # Set some atoms which are to be skipped from these test given their "fake" nature
97 excluded_atoms_selection = get_excluded_atoms_selection(structure, pbc_selection)
98 # If bonds match from the begining we are done as well
99 print('Checking default structure bonds')
100 if do_bonds_match(current_bonds, safe_bonds, excluded_atoms_selection, verbose=True, atoms=structure.atoms):
101 register.update_test(STABLE_BONDS_FLAG, True)
102 print(' They are good')
103 return
104 print(' They are wrong')
105 # Find the first frame in the whole trajectory where safe bonds are respected
106 safe_bonds_frame = get_bonds_canonical_frame(
107 structure_file = output_structure_file,
108 trajectory_file = input_trajectory_file,
109 snapshots = snapshots,
110 reference_bonds = safe_bonds,
111 structure = structure,
112 pbc_selection = pbc_selection
113 )
114 MD.get_reference_frame._set_parent_output(MD, safe_bonds_frame)
115 # If there is no canonical frame then stop here since there must be a problem
116 if safe_bonds_frame == None:
117 print('There is no canonical frame for safe bonds. Is the trajectory not imaged?')
118 must_be_killed = STABLE_BONDS_FLAG not in mercy
119 if must_be_killed:
120 raise TestFailure('Failed to find stable bonds')
121 register.update_test(STABLE_BONDS_FLAG, False)
122 register.add_warning(STABLE_BONDS_FLAG, ('Could not find a frame in the trajectory respecting all bonds if bonds were guessed according to atom coordinates and radius.\n'
123 'The main PDB structure is a default structure and it would be considered to have wrong bonds if they were predicted as previously stated.'))
124 return
125 # Set also the safe bonds frame structure to mine its coordinates
126 safe_bonds_frame_filename = get_pdb_frame(output_structure_file.path, input_trajectory_file.path, safe_bonds_frame)
127 safe_bonds_frame_structure = Structure.from_pdb_file(safe_bonds_frame_filename)
128 # Set all coordinates in the main structure by copying the safe bonds frame coordinates
129 for atom_1, atom_2 in zip(structure.atoms, safe_bonds_frame_structure.atoms):
130 atom_1.coords = atom_2.coords
131 # Remove the safe bonds frame since it is not required anymore
132 remove(safe_bonds_frame_filename)
133 # Set the modified variable as true since we have changes the structure
134 # Update the structure file using the corrected structure
135 print(' The structure file has been modified -> ' + output_structure_file.filename)
136 structure.generate_pdb_file(output_structure_file.path)
137 # Set the test as passed
138 register.update_test(STABLE_BONDS_FLAG, True)
139 check_stable_bonds()
141 # Write safe bonds back to the MD
142 MD.project.get_reference_bonds._set_parent_output(MD.project, safe_bonds)
144 # ------------------------------------------------------------------------------------------
145 # Incoherent atom bonds ---------------------------------------------------------------
146 # ------------------------------------------------------------------------------------------
148 # Set if coherent bonds have to be checked
149 must_check_coherent_bonds = COHERENT_BONDS_FLAG not in trust
151 # If this analysis has been already passed then we skip the process
152 if register.tests.get(COHERENT_BONDS_FLAG, None) == True:
153 must_check_stable_bonds = False
155 # Run the coherent bonds analysis if necessary
156 if must_check_coherent_bonds:
157 # Reset warnings related to this analysis
158 register.remove_warnings(COHERENT_BONDS_FLAG)
159 # If the test is not passed then report it
160 if structure.check_incoherent_bonds():
161 print('FAIL: Uncoherent bonds were found')
162 must_be_killed = COHERENT_BONDS_FLAG not in mercy
163 if must_be_killed:
164 raise TestFailure('Failed to find coherent bonds')
165 register.update_test(COHERENT_BONDS_FLAG, False)
166 register.add_warning(COHERENT_BONDS_FLAG, 'Some atoms may have a higher or lower number of bonds than they should according to their element.')
167 # Set the test as succeed if all was good
168 else:
169 register.update_test(COHERENT_BONDS_FLAG, True)
171 # ------------------------------------------------------------------------------------------
172 # Missing chains ---------------------------------------------------------------------------
173 # ------------------------------------------------------------------------------------------
175 # Check if chains are missing. If so, create a new chainned structure and set it as the reference
176 chains = structure.chains
178 # In case there are not chains at all
179 if len(chains) == 1 and ( chains[0].name == ' ' or chains[0].name == 'X' ):
180 print('WARNING: chains are missing and they will be added')
181 # Stop here if we have bonds guessed from coarse grain (i.e. we have no topology)
182 # Note that we rely in fragments (and thus in bonds) to guess chains
183 if next((True for bonds in structure.bonds if bonds == MISSING_BONDS), False):
184 raise InputError('We cannot guess chains with bonds guessed from coarse grain.\n'
185 ' Please either provide a topology or set chains in the structure PDB file.')
186 # Run the chainer
187 structure.auto_chainer()
188 # Update the structure file using the corrected structure
189 print(' The structure file has been modified -> ' + output_structure_file.filename)
190 structure.generate_pdb_file(output_structure_file.path)
192 else:
193 # In case there are some missing chains
194 # Note that atoms with no chain are not a problem for the workflow but they are for the web client
195 unlettered_chain = next((chain for chain in chains if chain.name == ' '), None)
196 if unlettered_chain:
197 current_letters = set([ chain.name for chain in chains ])
198 new_letter = get_new_letter(current_letters)
199 # If we run out of letters there may be some problematic chain configuration
200 # In this cases we cannot respect the original chains
201 if new_letter == None:
202 warn('No more letters in the alphabel to fill missing chains -> All chains will be assigned from scratch')
203 structure.auto_chainer()
204 else:
205 warn(f'Some chains are missing -> Unchained regions will be chained as {new_letter}')
206 unlettered_chain.name = new_letter
207 # Update the structure file using the corrected structure
208 print(f' The structure file has been modified -> {output_structure_file.filename}')
209 structure.generate_pdb_file(output_structure_file.path)
211 # ------------------------------------------------------------------------------------------
212 # Splitted chains --------------------------------------------------------------------------
213 # ------------------------------------------------------------------------------------------
215 # Check if chains are splitted. If so, stop here and warn the user
216 checked_chains = []
217 last_chain = None
218 for atom in structure.atoms:
219 chain_name = atom.chain.name
220 if chain_name == last_chain:
221 continue
222 last_chain = chain_name
223 if chain_name in checked_chains:
224 # Note that this will never happen because of the pdb itself, duplicated chains are handled further
225 # This will only happen if chains were missing and guessed, and there is something very wrong in the structure
226 # Check fragments with the VMD and searh for wrong bonds
227 raise TestFailure(f'We are having splitted chains (e.g. {chain_name})')
228 checked_chains.append(chain_name)
230 # ------------------------------------------------------------------------------------------
231 # Repeated chains --------------------------------------------------------------------------
232 # ------------------------------------------------------------------------------------------
234 if structure.check_repeated_chains(fix_chains=True, display_summary=True):
235 # Update the structure file using the corrected structure
236 print(' The structure file has been modified -> ' + output_structure_file.filename)
237 structure.generate_pdb_file(output_structure_file.path)
239 # ------------------------------------------------------------------------------------------
240 # Coherent chains --------------------------------------------------------------------------
241 # ------------------------------------------------------------------------------------------
243 # Makes sure polymers with sequence are all bonded
244 # In other words, make sure there are not multiple proteins/nucleics labeled with the same chain
245 # Note that their sequences otherwise will be merged silently, thus not reflecting reality
246 # This would make protein mapping impossible for instance
247 sequence_polymers_selection = structure.select_protein() + structure.select_nucleic()
249 # We also want to support having a solution of free aminoacids/nucleotides
250 # So if residues in the chain are not bonded or bonded in very small fragments we accept it
251 N_RESIDUES_CUTOFF = 3
253 # Save if we splitted chains
254 had_to_split_chains = False
256 # Iterate sequence polymer chains
257 for chain in structure.get_selection_chains(sequence_polymers_selection):
258 # If there is only one fragment we are good
259 chain_selection = chain.get_selection()
260 fragments = list(structure.find_fragments(chain_selection))
261 if len(fragments) == 0: raise ValueError(f'No fragments found in chain {chain.name}')
262 if len(fragments) == 1: continue
263 # We also want to support having a solution of free aminoacids/nucleotides
264 # So if residues in the chain are not bonded or bonded in very small fragments we accept it
265 # Make a single fragments out of all small fragments
266 independent_fragments = []
267 merged_fragment = None
268 for fragment in fragments:
269 # If the fragments reaches the size cutoff then send it to the independent fragments list
270 residues = structure.get_selection_residues(fragment)
271 if len(residues) > N_RESIDUES_CUTOFF:
272 independent_fragments.append(fragment)
273 continue
274 # Otherwise merge it with other small fragments
275 if merged_fragment: merged_fragment += fragment
276 else: merged_fragment = fragment
277 # Add the merged fragment to the list as well
278 if merged_fragment:
279 independent_fragments.append(merged_fragment)
280 # If we have only one fragment after the merging then we are good
281 if len(independent_fragments) == 1: continue
282 # Otherwise rename fragments
283 warn(f'Chain {chain.name} has sequence polymer(s) but not all atoms are bonded.')
284 # We can skip the first fragment, so it can keep the original chain name
285 for independent_fragment in independent_fragments[1:]:
286 print(f' A fragment of this chain including {len(independent_fragment)} atoms will be splitted appart.')
287 had_to_split_chains = True
288 # Make a new chain for the current fragment
289 new_chain_name = structure.get_next_available_chain_name(chain.name)
290 print(f' This fragment will be renamed as chain "{new_chain_name}"')
291 structure.set_selection_chain_name(independent_fragment, new_chain_name)
293 # If we did any change then save the structure
294 if had_to_split_chains:
295 print(' The structure file has been modified -> ' + output_structure_file.filename)
296 structure.generate_pdb_file(output_structure_file.path)
298 # ------------------------------------------------------------------------------------------
299 # Splitted residues ------------------------------------------------------------------------
300 # ------------------------------------------------------------------------------------------
302 if structure.check_repeated_residues(fix_residues=True, display_summary=True):
303 # Update the structure file using the corrected structure
304 print(' The structure file has been modified -> ' + output_structure_file.filename)
305 structure.generate_pdb_file(output_structure_file.path)
307 # Sort trajectory coordinates in case atoms were sorted
308 if input_trajectory_file.path and structure.trajectory_atom_sorter:
309 # Save a warning in the register
310 warn('Atoms have been sorted to solve splitted residues')
311 print('Creating resorted files for atom bonds and charges')
312 # Bonds are already resorted
313 save_json(safe_bonds, MD.project.resorted_bonds_file.path, indent=4)
314 # Charges are to be resorted
315 charges = MD.project.get_charges._get_parent_output(MD.project)
316 resorted_charges = [ charges[index] for index in structure.new_atom_order ]
317 MD.project.get_charges._set_parent_output(MD.project, resorted_charges)
318 save_json(resorted_charges, MD.project.resorted_charges_file.path, indent=4)
319 print('Sorting trajectory coordinates to fit the new structure atom sort...')
320 structure.trajectory_atom_sorter(output_structure_file, input_trajectory_file, output_trajectory_file)
322 # ------------------------------------------------------------------------------------------
323 # Repeated atoms ---------------------------------------------------------------------------
324 # ------------------------------------------------------------------------------------------
326 if structure.check_repeated_atoms(fix_atoms=True, display_summary=True):
327 # Update the structure file using the corrected structure
328 print(' The structure file has been modified -> ' + output_structure_file.filename)
329 structure.generate_pdb_file(output_structure_file.path)