Coverage for mddb_workflow/tools/check_inputs.py: 56%
147 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 15:48 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 15:48 +0000
1from mddb_workflow.utils.auxiliar import InputError, warn, CaptureOutput, load_json, MISSING_TOPOLOGY
2from mddb_workflow.utils.constants import STANDARD_TOPOLOGY_FILENAME
3from mddb_workflow.utils.pyt_spells import find_first_corrupted_frame
4from mddb_workflow.utils.gmx_spells import run_gromacs, mine_system_atoms_count, get_atom_count
5from mddb_workflow.utils.vmd_spells import vmd_to_pdb
6from mddb_workflow.utils.structures import Structure
7from mddb_workflow.utils.file import File
9from mddb_workflow.tools.guess_and_filter import guess_and_filter_topology
11from re import match, search
12from typing import *
13from scipy.io import netcdf_file
14import mdtraj as mdt
15import pytraj as pyt
17# Set some known message errors
18NETCDF_DTYPE_ERROR = 'When changing to a larger dtype, its size must be a divisor of the total size in bytes of the last axis of the array.'
19MDTRAJ_ATOM_MISMATCH_ERROR = r'xyz must be shape \(Any, ([0-9]*), 3\). You supplied \(1, ([0-9]*), 3\)'
20MDTRAJ_INSERTION_CODES_ERROR = r'^Could not convert residue number \[[0-9]*[a-zA-Z]\]$'
21PYTRAJ_XTC_ATOM_MISMATCH_ERROR = r'Error: # atoms in XTC file \(([0-9]*)\) does not match # atoms in (topology|parm) [\w.-]* \(([0-9]*)\)'
22GROMACS_ATOM_MISMATCH_ERROR = r'is larger than the number of atoms in the\ntrajectory file \(([0-9]*)\). There is a mismatch in the contents'
24# List supported formats
25TOPOLOGY_SUPPORTED_FORMATS = { 'tpr', 'top', 'prmtop', 'psf' }
26TRAJECTORY_SUPPORTED_FORMATS = { 'xtc', 'trr', 'nc', 'dcd', 'crd', 'pdb', 'rst7' }
27STRUCTURE_SUPPORTED_FORMATS = { *TOPOLOGY_SUPPORTED_FORMATS, 'pdb', 'gro' }
28GROMACS_TRAJECTORY_SUPPORTED_FORMATS = { 'xtc', 'trr'}
30# Auxiliar PDB file which may be generated to load non supported restart files
31AUXILIAR_PDB_FILE = '.auxiliar.pdb'
33# Set exceptions for fixes applied from here
34PREFILTERED_TOPOLOGY_EXCEPTION = Exception('Prefiltered topology')
36def check_inputs (
37 input_structure_file : 'File',
38 input_trajectory_files : list['File'],
39 input_topology_file : Union['File', Exception]) -> dict:
40 """
41 Check input files coherence and integrity.
42 If there is any problem then raises an input error.
43 Some exceptional problems may be fixed from here.
44 In these cases, both the exception and the modified file are returned in a final dict.
45 """
47 # Set the exceptions dict to be returned at the end
48 exceptions = {}
50 # Get a sample trajectory file and then check its format
51 # All input trajectory files must have the same format
52 trajectory_sample = input_trajectory_files[0]
54 # Check input files are supported by the workflow
55 if input_topology_file != MISSING_TOPOLOGY and input_topology_file.filename != STANDARD_TOPOLOGY_FILENAME and input_topology_file.format not in TOPOLOGY_SUPPORTED_FORMATS:
56 if input_topology_file.format in { 'pdb', 'gro' }:
57 raise InputError('A structure file is not supported as topology anymore. If there is no topology then use the argument "-top no"')
58 raise InputError(f'Topology {input_topology_file.path} has a not supported format. Try one of these: {", ".join(TOPOLOGY_SUPPORTED_FORMATS)}')
59 if trajectory_sample.format not in TRAJECTORY_SUPPORTED_FORMATS:
60 raise InputError(f'Trajectory {trajectory_sample.path} has a not supported format. Try one of these: {", ".join(TRAJECTORY_SUPPORTED_FORMATS)}')
61 if input_structure_file.format not in STRUCTURE_SUPPORTED_FORMATS:
62 raise InputError(f'Structure {input_structure_file.path} has a not supported format. Try one of these: {", ".join(STRUCTURE_SUPPORTED_FORMATS)}')
64 # Make sure the trajectory file is not corrupted
66 # Check if reading the trajectory raises the following error
67 # ValueError: When changing to a larger dtype, its size must be a divisor of the total size in bytes of the last axis of the array.
68 # This error may happen with NetCDF files and it is a bit shady
69 # Some tools may be able to read the first frames of the corrupted file: VMD and pytraj
70 # Some other tools will instantly fail to read it: MDtraj and MDAnalysis
71 if trajectory_sample.format == 'nc':
72 try:
73 # Iterate trajectory files
74 for trajectory_file in input_trajectory_files:
75 # This does not read the whole trajectory
76 netcdf_file(trajectory_file.path, 'r')
77 except Exception as error:
78 # If the error message matches with a known error then report the problem
79 error_message = str(error)
80 if error_message == NETCDF_DTYPE_ERROR:
81 warn(f'Corrupted trajectory file {trajectory_file.path}')
82 pytraj_input_topology = input_topology_file if input_topology_file != MISSING_TOPOLOGY else input_structure_file
83 first_corrupted_frame = find_first_corrupted_frame(pytraj_input_topology.path, trajectory_file.path)
84 print(f' However some tools may be able to read the first {first_corrupted_frame} frames: VMD and PyTraj')
85 raise InputError('Corrupted input trajectory file')
86 # If we do not know the error then raise it as is
87 else:
88 raise error
90 # Set a function to get atoms from a structure alone
91 def get_structure_atoms (structure_file : 'File') -> int:
92 # If this is not a Structure supported file then use an alternative function
93 if structure_file.format == 'gro':
94 return get_atom_count(structure_file)
95 # Get the number of atoms in the input structure
96 structure = Structure.from_file(structure_file.path)
97 return structure.atom_count
99 # Set a function to get atoms from structure and trajectory together
100 def get_structure_and_trajectory_atoms (structure_file : 'File', trajectory_file : 'File') -> tuple[int, int]:
101 # Note that declaring the iterator will not fail even when there is a mismatch
102 trajectory = mdt.iterload(trajectory_file.path, top=structure_file.path, chunk=1)
103 # We must consume the generator first value to make the error raise
104 frame = next(trajectory)
105 # Now obtain the number of atoms from the frame we just read
106 trajectory_atom_count = frame.n_atoms
107 # And still, it may happen that the topology has more atoms than the trajectory but it loads
108 # MDtraj may silently load as many coordinates as possible and discard the rest of atoms in topology
109 # This behaviour has been observed with a gromacs .top topology and a PDB used as trajectory
110 # Two double check the match, load the topology alone with PyTraj
111 topology = pyt.load_topology(structure_file.path)
112 structure_atom_count = topology.n_atoms
113 return structure_atom_count, trajectory_atom_count
115 # Set a function to get atoms from topology and trajectory together
116 def get_topology_and_trajectory_atoms (topology_file : 'File', trajectory_file : 'File') -> tuple[int, int]:
117 # To do so rely on different tools depending on the topology format
118 # If there is no topology file then just compare strucutre and trajectory an exit
119 if topology_file == MISSING_TOPOLOGY:
120 # We do not have a topology atom count to return
121 # Without a valid topology we can not count trajectory atoms either
122 return None, None
123 # If it is our standard topology then simply count the atom names
124 # Get trajectory atoms using the structure instead
125 if topology_file.filename == STANDARD_TOPOLOGY_FILENAME:
126 # Parse the json and count atoms
127 parsed_topology = load_json(topology_file.path)
128 topology_atom_count = len(parsed_topology['atom_names'])
129 # Without a valid topology we can not count trajectory atoms
130 return topology_atom_count, None
131 # For a TPR use Gromacs, which is its native tool
132 if topology_file.format == 'tpr':
133 # Make sure the trajectory is compatible with gromacs
134 if trajectory_file.format not in GROMACS_TRAJECTORY_SUPPORTED_FORMATS:
135 raise InputError('Why loading a TPR topology with a non-gromacs trajectory?')
136 # Run Gromacs just to generate a structure using all atoms in the topology and coordinates in the first frame
137 # If atoms do not match then we will see a specific error
138 output_sample_file = File('.sample.gro')
139 output_logs, error_logs = run_gromacs(f'trjconv -s {topology_file.path} \
140 -f {trajectory_file.path} -o {output_sample_file.path} -dump 0',
141 user_input = 'System', expected_output_filepath = None)
142 # Always get error logs and mine topology atoms
143 # Note that these logs include the output selection request from Gromacs
144 # This log should be always there, even if there was a mismatch and then Gromacs failed
145 topology_atom_count = mine_system_atoms_count(error_logs)
146 # If the output does not exist at this point it means something went wrong with gromacs
147 if not output_sample_file.exists:
148 # Check if we know the error
149 error_match = search(GROMACS_ATOM_MISMATCH_ERROR, error_logs)
150 if error_match:
151 # Get the trajectory atom count
152 trajectory_atom_count = int(error_match[1])
153 return topology_atom_count, trajectory_atom_count
154 # Otherwise just print the whole error logs and stop here anyway
155 print(output_logs)
156 print(error_logs)
157 raise SystemExit('Something went wrong with GROMACS during the checking')
158 # If we had an output then it means both topology and trajectory match in the number of atoms
159 # Cleanup the file we just created and proceed
160 output_sample_file.remove()
161 # If there was no problem then it means the trajectory atom count matches the topology atom count
162 trajectory_atom_count = topology_atom_count
163 return topology_atom_count, trajectory_atom_count
164 # For .top files we use PyTraj since MDtraj can not handle it
165 if topology_file.format == 'top':
166 return get_topology_and_trajectory_atoms_pytraj(topology_file, trajectory_file)
167 # At this point the topology should be supported by MDtraj
168 # However, f the trajectory is a restart file MDtraj will not be able to read it
169 # Make the conversion here, since restart files are single-frame trajectories this should be fast
170 use_auxiliar_pdb = False
171 if trajectory_file.format == 'rst7':
172 # Generate the auxiliar PDB file
173 vmd_to_pdb(topology_file.path, trajectory_file.path, AUXILIAR_PDB_FILE)
174 use_auxiliar_pdb = True
175 # For any other format use MDtraj
176 try:
177 # Note that declaring the iterator will not fail even when there is a mismatch
178 trajectory_path = AUXILIAR_PDB_FILE if use_auxiliar_pdb else trajectory_file.path
179 trajectory = mdt.iterload(trajectory_path, top=topology_file.path, chunk=1)
180 # We must consume the generator first value to make the error raise
181 frame = next(trajectory)
182 # Now obtain the number of atoms from the frame we just read
183 trajectory_atom_count = frame.n_atoms
184 # And still, it may happen that the topology has more atoms than the trajectory but it loads
185 # MDtraj may silently load as many coordinates as possible and discard the rest of atoms in topology
186 # This behaviour has been observed with a gromacs .top topology and a PDB used as trajectory
187 # Two double check the match, load the topology alone with PyTraj
188 topology = pyt.load_topology(topology_file.path)
189 topology_atom_count = topology.n_atoms
190 return topology_atom_count, trajectory_atom_count
191 except Exception as error:
192 # If the error message matches with a known error then report the problem
193 error_message = str(error)
194 error_match = match(MDTRAJ_ATOM_MISMATCH_ERROR, error_message)
195 if error_match:
196 topology_atom_count = int(error_match[1])
197 trajectory_atom_count = int(error_match[2])
198 return topology_atom_count, trajectory_atom_count
199 error_match = match(MDTRAJ_INSERTION_CODES_ERROR, error_message)
200 if error_match:
201 warn('The input topology has insertion codes.\n'+ \
202 ' Some tools may crash when reading the topology (MDtraj).\n'+ \
203 ' Some tools may ignore insertion codes when reading the topology (MDAnlysis, PyTraj, VMD).')
204 # Use other tool to read the topology
205 # Other tools could ignore the inserion codes
206 # However this is not a problem here, where we only care bout the number of atoms
207 return get_topology_and_trajectory_atoms_pytraj(topology_file, trajectory_file)
208 # If we do not know the error then raise it as is
209 raise error
211 # Set a function to get atoms from topology and trajectory together using pytraj
212 # This is an altermative method used when MDtraj can not handle it
213 def get_topology_and_trajectory_atoms_pytraj (topology_file : 'File', trajectory_file : 'File') -> tuple[int, int]:
214 # Note that calling iterload will print a error log when atoms do not match but will not raise a proper error
215 # To capture the error log we must throw this command wrapped in a stdout redirect
216 trajectory = None
217 with CaptureOutput('stderr') as output:
218 trajectory = pyt.iterload(trajectory_file.path, top=topology_file.path)
219 logs = output.captured_text
220 error_match = match(PYTRAJ_XTC_ATOM_MISMATCH_ERROR, logs)
221 if error_match:
222 topology_atom_count = int(error_match[3])
223 trajectory_atom_count = int(error_match[1])
224 # Now obtain the number of atoms from the frame we just read
225 else:
226 topology_atom_count = trajectory_atom_count = trajectory.n_atoms
227 return topology_atom_count, trajectory_atom_count
229 # Get topology and trajectory atom counts
230 topology_atom_count, trajectory_atom_count = get_topology_and_trajectory_atoms(input_topology_file, trajectory_sample)
232 # If we have the trajectory atom count then it means we had a valid topology
233 if trajectory_atom_count != None:
235 # Make sure their atom counts match
236 if topology_atom_count != trajectory_atom_count:
237 warn('Mismatch in the number of atoms between input files:\n' +
238 f' Topology "{input_topology_file.path}" -> {topology_atom_count} atoms\n' +
239 f' Trajectory "{trajectory_sample.path}" -> {trajectory_atom_count} atoms')
240 if topology_atom_count < trajectory_atom_count:
241 raise InputError('Trajectory has more atoms than topology, there is no way to fix this.')
242 # If the topology has more atoms than the trajectory however we may attempt to guess
243 # If we guess which atoms are the ones in the trajectory then we can filter the topology
244 else:
245 prefiltered_topology_filepath = f'{input_topology_file.basepath}/prefiltered.{input_topology_file.format}'
246 prefiltered_topology_file = File(prefiltered_topology_filepath)
247 guessed = guess_and_filter_topology(
248 input_topology_file,
249 prefiltered_topology_file,
250 trajectory_atom_count)
251 if guessed: exceptions[PREFILTERED_TOPOLOGY_EXCEPTION] = prefiltered_topology_file
252 else: raise InputError('Could not guess topology atom selection to match trajectory atoms count')
254 # If the topology file is already the structure file then there is no need to check it
255 if input_structure_file == input_topology_file:
256 print(f'Topology and trajectory files match in number of atoms: {trajectory_atom_count}')
257 return exceptions
259 # If the counts match then also get the structure atom count and compare
260 structure_atom_count = get_structure_atoms(input_structure_file)
262 # Make sure it matches the topology and trajectory atom count
263 if topology_atom_count != structure_atom_count:
264 raise InputError('Mismatch in the structure input file number of atoms:\n'+
265 f' Topology and trajectory -> {topology_atom_count} atoms\n' +
266 f' Structure "{input_structure_file.path}" -> {structure_atom_count} atoms')
268 # If we reached this point then it means everything is matching
269 print(f'All input files match in number of atoms: {trajectory_atom_count}')
270 return exceptions
272 # Otherwise it means we had not a valid topology file
273 # We must use the structure to find trajectory atoms
274 structure_atom_count, trajectory_atom_count = get_structure_and_trajectory_atoms(input_structure_file, trajectory_sample)
276 # Make sure their atom counts match
277 if structure_atom_count != trajectory_atom_count:
278 raise InputError('Mismatch in the number of atoms between input files:\n' +
279 f' Structure "{input_structure_file.path}" -> {structure_atom_count} atoms\n' +
280 f' Trajectory "{trajectory_sample.path}" -> {trajectory_atom_count} atoms')
282 # If we have a number of topology atoms then make sure it matches the structure and trajectory atoms
283 # This may happen if the topology is our standard topology file instead of a valid topology
284 if topology_atom_count != None and topology_atom_count != trajectory_atom_count:
285 raise InputError('Mismatch in the number of atoms between input files:\n' +
286 f' Structure and trajectory -> {trajectory_atom_count} atoms\n' +
287 f' Topology "{input_topology_file.path}" -> {topology_atom_count} atoms')
289 # If we made it this far it means all checkings are good
290 print(f'Input files match in number of atoms: {trajectory_atom_count}')
291 return exceptions