Coverage for mddb_workflow/utils/gmx_spells.py: 52%
226 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 15:48 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 15:48 +0000
1from os import remove, rename
2from os.path import exists, getmtime
3from shutil import copyfile
4from subprocess import run, PIPE, Popen
5from re import search
6from time import time
8from mddb_workflow.utils.auxiliar import load_json
9from mddb_workflow.utils.constants import GROMACS_EXECUTABLE, GREY_HEADER, COLOR_END
10from mddb_workflow.utils.file import File
11from mddb_workflow.utils.type_hints import *
13from mddb_workflow.tools.fix_gromacs_masses import fix_gromacs_masses
15# Set a function to call gromacs in a more confortable and standarized way:
16# - Standard gromacs executable, which may be provided by the user
17# - Gromacs mass fixes by using a custom atommass.dat file with extended atom names
18# - Hidden unnecessary output logs and grey-colored necessary ones
19# - Missing output checks
20# Then return both output and error logs
21def run_gromacs(command : str, user_input : Optional[str] = None,
22 expected_output_filepath : Optional[str] = 'auto',
23 show_output_logs : bool = False, show_error_logs : bool = False) -> tuple[str, str]:
25 # Run a fix for gromacs if not done before
26 # Note that this is run always at the moment the code is read, no matter the command or calling origin
27 fix_gromacs_masses()
29 # In case we have user input we must open a process to then pipe it in the gromacs process
30 if user_input:
31 # The -e option allows the interpretation of '\', which is critial for the make_ndx command
32 # WARNING: Explicity split by whitespaces
33 # WARNING: If not specified then breaklines are also removed, which is ciritcal for make_ndx
34 user_input_process = Popen([ "echo", "-e", *user_input.split(' ') ], stdout=PIPE)
36 # Get the time at the moment we start to run the command
37 # WARNING: We must truncate this value, because the file mtime is truncated as well
38 # Otherwise it may happen that gromacs runs in less than 1 seconds and the start time is higher
39 start_time = time().__trunc__()
41 # Set the gromacs process
42 # Note that at this point the command is not yet run
43 process = run([ GROMACS_EXECUTABLE, *command.split(), '-quiet' ],
44 stdin = user_input_process.stdout if user_input else None,
45 stdout = PIPE, stderr = PIPE if not show_error_logs else None)
47 # If error is to be shown the color it in grey
48 # This is usually used to see progress logs
49 if show_error_logs: print(GREY_HEADER, end='\r')
51 # Consume the gromacs process output thus running the command
52 output_logs = process.stdout.decode()
53 # Consume also error logs, but this will not run the command again
54 error_logs = process.stderr.decode() if not show_error_logs else None
56 # End the grey coloring
57 if show_error_logs: print(COLOR_END, end='\r')
59 # In case the expected output is set as 'auto' we must guess it from the command
60 # Normally output comes after the '-o' option
61 if expected_output_filepath == 'auto':
62 command_splits = command.split()
63 if '-o' not in command_splits: expected_output_filepath = None
64 else:
65 option_flag_index = command_splits.index('-o')
66 expected_output_filepath = command_splits[option_flag_index + 1]
68 # If an output file was expected then check it was created
69 if expected_output_filepath:
70 # Make sure output exists
71 output_exists = exists(expected_output_filepath)
72 # Make sure output have been generated by the gromacs command and it is not an old output
73 output_is_old = False
74 if output_exists:
75 output_time = getmtime(expected_output_filepath)
76 output_is_old = output_time < start_time
77 # If output was not generated then we report the problem
78 if not output_exists or output_is_old:
79 # If we are missing the expetced output then report it
80 print(output_logs)
81 print(error_logs)
82 # Recreate the exact command
83 final_command = f'{GROMACS_EXECUTABLE} {command}'
84 if user_input: final_command += f' (with user input "{user_input}")'
85 raise SystemExit(f'Something went wrong with Gromacs while running "{GROMACS_EXECUTABLE} {command}"')
87 # If all was good then show final logs but only if it was requested
88 if show_output_logs: print(output_logs)
90 # Return outputs
91 return output_logs, error_logs
94# Get the first frame from a trajectory
95def get_first_frame (input_structure_filename : str, input_trajectory_filename : str, output_frame_filename : str):
96 # Run Gromacs
97 run_gromacs(f'trjconv -s {input_structure_filename} -f {input_trajectory_filename} \
98 -o {output_frame_filename} -dump 0', user_input = 'System')
100# Set function supported formats
101get_first_frame.format_sets = [
102 {
103 'inputs': {
104 'input_structure_filename': {'tpr', 'pdb', 'gro'},
105 'input_trajectory_filename': {'xtc', 'trr'}
106 },
107 'outputs': {
108 'output_frame_filename': {'pdb', 'gro'}
109 }
110 },
111 {
112 'inputs': {
113 'input_structure_filename': None,
114 'input_trajectory_filename': {'xtc', 'trr'}
115 },
116 'outputs': {
117 'output_frame_filename': {'xtc', 'trr'}
118 }
119 },
120 {
121 'inputs': {
122 'input_structure_filename': None,
123 'input_trajectory_filename': {'pdb'}
124 },
125 'outputs': {
126 'output_frame_filename': {'pdb', 'xtc', 'trr'}
127 }
128 },
129 {
130 'inputs': {
131 'input_structure_filename': None,
132 'input_trajectory_filename': {'gro'}
133 },
134 'outputs': {
135 'output_frame_filename': {'gro', 'xtc', 'trr'}
136 }
137 }
138]
140# Get the structure using the first frame getter function
141def get_structure (input_structure_filename : str, input_trajectory_filename : str, output_structure_filename : str):
142 get_first_frame(input_structure_filename, input_trajectory_filename, output_structure_filename)
143get_structure.format_sets = [
144 {
145 'inputs': {
146 'input_structure_filename': {'tpr', 'pdb', 'gro'},
147 'input_trajectory_filename': {'xtc', 'trr'}
148 },
149 'outputs': {
150 'output_structure_filename': {'pdb', 'gro'}
151 }
152 }
153]
155# Convert the structure using the first frame getter function (no trajectory is required)
156def get_structure_alone (input_structure_filename : str, output_structure_filename : str):
157 get_first_frame(input_structure_filename, input_structure_filename, output_structure_filename)
158get_structure_alone.format_sets = [
159 {
160 'inputs': {
161 'input_structure_filename': {'pdb', 'gro'},
162 },
163 'outputs': {
164 'output_structure_filename': {'pdb', 'gro'}
165 }
166 }
167]
169# Get gromacs supported trajectories merged and converted to a different format
170def merge_and_convert_trajectories (input_trajectory_filenames : list[str], output_trajectory_filename : str):
171 # Get trajectory formats
172 sample_trajectory = input_trajectory_filenames[0]
173 sample_trajectory_file = File(sample_trajectory)
174 input_trajectories_format = sample_trajectory_file.format
175 output_trajectory_file = File(output_trajectory_filename)
176 output_trajectory_format = output_trajectory_file.format
177 auxiliar_single_trajectory_filename = '.single_trajectory.' + input_trajectories_format
178 # If we have multiple trajectories then join them
179 if len(input_trajectory_filenames) > 1:
180 single_trajectory_filename = auxiliar_single_trajectory_filename
181 run_gromacs(f'trjcat -f {" ".join(input_trajectory_filenames)} \
182 -o {single_trajectory_filename}')
183 else:
184 single_trajectory_filename = sample_trajectory
185 # In case input and output formats are different we must convert the trajectory
186 if input_trajectories_format != output_trajectory_format:
187 run_gromacs(f'trjconv -f {single_trajectory_filename} \
188 -o {output_trajectory_filename}')
189 else:
190 copyfile(single_trajectory_filename, output_trajectory_filename)
191 # Remove residual files
192 if exists(auxiliar_single_trajectory_filename):
193 remove(auxiliar_single_trajectory_filename)
195merge_and_convert_trajectories.format_sets = [
196 {
197 'inputs': {
198 'input_trajectory_filenames': {'xtc', 'trr'}
199 },
200 'outputs': {
201 'output_trajectory_filename': {'xtc', 'trr'}
202 }
203 },
204 {
205 'inputs': {
206 'input_trajectory_filenames': {'pdb'}
207 },
208 'outputs': {
209 'output_trajectory_filename': {'pdb', 'xtc', 'trr'}
210 }
211 },
212 {
213 'inputs': {
214 'input_trajectory_filenames': {'gro'}
215 },
216 'outputs': {
217 'output_trajectory_filename': {'gro', 'xtc', 'trr'}
218 }
219 }
220]
222# Get specific frames from a trajectory
223def get_trajectory_subset (
224 input_trajectory_filename : str,
225 output_trajectory_filename : str,
226 start : int = 0,
227 end : int = None,
228 step : int = 1,
229 frames : list[int] = [],
230 skip : list[int] = []
231):
232 # Set a list with frame indices from
233 output_frames = frames if frames and len(frames) > 0 else [ frame for frame in range(start, end, step) if frame not in skip ]
235 # Generate the ndx file to target the desired frames
236 auxiliar_ndx_filename = '.frames.ndx'
237 generate_frames_ndx(output_frames, auxiliar_ndx_filename)
239 # Now run gromacs trjconv command in order to extract the desired frames
240 run_gromacs(f'trjconv -f {input_trajectory_filename} -o {output_trajectory_filename} \
241 -fr {auxiliar_ndx_filename}', user_input = 'System')
243 # Cleanup the auxiliar ndx file
244 remove(auxiliar_ndx_filename)
247get_trajectory_subset.format_sets = [
248 {
249 'inputs': {
250 'input_trajectory_filename': {'xtc', 'trr'}
251 },
252 'outputs': {
253 'output_trajectory_filename': {'xtc', 'trr'}
254 }
255 }
256]
258# Filter trajectory atoms
259def filter_structure (
260 input_structure_file : 'File',
261 output_structure_file : 'File',
262 input_selection : 'Selection'
263):
264 # Generate a ndx file with the desired selection
265 filter_selection_name = 'filter'
266 filter_index_content = input_selection.to_ndx(selection_name=filter_selection_name)
267 filter_index_filename = '.filter.ndx'
268 with open(filter_index_filename, 'w') as file:
269 file.write(filter_index_content)
271 # Filter the structure
272 run_gromacs(f'editconf -f {input_structure_file.path} -o {output_structure_file.path} \
273 -n {filter_index_filename}', user_input = filter_selection_name)
275 # Cleanup the index file
276 remove(filter_index_filename)
278filter_structure.format_sets = [
279 {
280 'inputs': {
281 'input_structure_file': {'pdb', 'gro'},
282 },
283 'outputs': {
284 'output_structure_file': {'pdb', 'gro'}
285 }
286 }
287]
289# Filter trajectory atoms
290def filter_trajectory (
291 input_structure_file : 'File',
292 input_trajectory_file : 'File',
293 output_trajectory_file : 'File',
294 input_selection : 'Selection'
295):
296 # Generate a ndx file with the desired selection
297 filter_selection_name = 'filter'
298 filter_index_content = input_selection.to_ndx(selection_name=filter_selection_name)
299 filter_index_filename = '.filter.ndx'
300 with open(filter_index_filename, 'w') as file:
301 file.write(filter_index_content)
303 # Filter the trajectory
304 # Now run gromacs trjconv command in order to extract the desired frames
305 run_gromacs(f'trjconv -s {input_structure_file.path} -f {input_trajectory_file.path} \
306 -o {output_trajectory_file.path} -n {filter_index_filename}',
307 user_input = filter_selection_name)
309 # Cleanup the index file
310 remove(filter_index_filename)
312filter_trajectory.format_sets = [
313 {
314 'inputs': {
315 'input_structure_file': {'tpr', 'pdb', 'gro'},
316 'input_trajectory_file': {'xtc', 'trr'}
317 },
318 'outputs': {
319 'output_trajectory_file': {'xtc', 'trr'}
320 }
321 }
322]
324# Set a regular expression to further mine data from gromacs logs
325GROMACS_SYSTEM_ATOMS_REGEX = r'System\) has[ ]+([0-9]*) elements'
327# Mine system atoms count from gromacs logs
328def mine_system_atoms_count (logs : str) -> int:
329 system_atoms_match = search(GROMACS_SYSTEM_ATOMS_REGEX, logs)
330 if not system_atoms_match:
331 print(logs)
332 raise ValueError('Failed to mine Gromacs error logs')
333 return int(system_atoms_match[1])
335# Count TPR atoms
336def get_tpr_atom_count (tpr_filepath : str) -> int:
337 # Make sure the filepath is valid
338 if not exists(tpr_filepath):
339 raise ValueError('Trying to count atoms from a topology which does not exist')
340 # Run Gromacs only to see the number of atoms in the TPR
341 output_logs, error_logs = run_gromacs(f'convert-tpr -s {tpr_filepath}', user_input = "whatever")
342 # Mine the number of atoms in the system from the logs
343 atom_count = mine_system_atoms_count(error_logs)
344 return atom_count
346# Read and parse a tpr using the MDDB-specific tool from gromacs
347def read_and_parse_tpr (tpr_filepath : str) -> dict:
348 expected_output_filepath = 'siminfo.json'
349 run_gromacs(f'dump -s {tpr_filepath} --json', expected_output_filepath = expected_output_filepath)
350 parsed_tpr = load_json(expected_output_filepath)
351 #remove(expected_output_filepath)
352 return parsed_tpr
354# Read a tpr file by converting it to ASCII
355def get_tpr_content (tpr_filepath : str) -> tuple[str, str]:
356 # Read the tpr file making a 'dump'
357 return run_gromacs(f'dump -s {tpr_filepath}')
359# Regular expresion to mine atom charges
360GROMACS_TPR_ATOM_CHARGES_REGEX = r"q=([0-9e+-. ]*),"
362# Get tpr atom charges
363# This works for the new tpr format (tested in 122)
364def get_tpr_charges (tpr_filepath : str) -> list[float]:
365 # Read the TPR
366 tpr_content, tpr_error_logs = get_tpr_content(tpr_filepath)
367 # Mine the atomic charges
368 charges = []
369 # Iterate tpr content lines
370 for line in tpr_content.split('\n'):
371 # Skip everything which is not atomic charges data
372 if line[0:16] != ' atom': continue
373 # Parse the line to get only charges
374 match = search(GROMACS_TPR_ATOM_CHARGES_REGEX, line)
375 if match: charges.append(float(match[1]))
376 # If we successfully got atom charges then return them
377 if len(charges) > 0: return charges
378 # If there are no charges at the end then something went wrong
379 print(tpr_error_logs)
380 raise RuntimeError(f'Charges extraction from tpr file "{tpr_filepath}" has failed')
382# Regular expresion to mine atom bonds
383GROMACS_TPR_ATOM_BONDS_REGEX = r"^\s*([0-9]*) type=[0-9]* \((BONDS|CONSTR|CONNBONDS)\)\s*([0-9]*)\s*([0-9]*)$"
384# Set a regular expression for SETTLE bonds, used for rigid waters
385# ---- From the paper ------------------------------------------------------------------------------------
386# The SETTLE can be applied to a four-point water model like TIP4P5 which has the fourth point with
387# a certain charge and no mass if the force acting on the fourth point is distributed onto the other three
388# points with masses in a reasonable manner.
389# S. Miyamoto and P.A. Kollman, “SETTLE: An analytical version of the SHAKE and RATTLE algorithms for rigid
390# water models,” J. Comp. Chem., 13 952–962 (1992)
391# --------------------------------------------------------------------------------------------------------
392# So it may happen that we encounter SETTLE with 4 atoms
393# This is not yet supported, but at least we check if this is happening to raise an error when found
394GROMACS_TPR_SETTLE_REGEX = r"^\s*([0-9]*) type=[0-9]* \(SETTLE\)\s*([0-9]*)\s*([0-9]*)\s*([0-9]*)\s*([0-9]*)$"
396# Get tpr atom bonds
397def get_tpr_bonds (tpr_filepath : str) -> list[ tuple[int, int] ]:
398 # Read and parse the TPR
399 parsed_tpr = read_and_parse_tpr(tpr_filepath)
400 # Get the bonds only
401 tpr_bonds = parsed_tpr['gromacs-topology']['bond']['value']
402 return tpr_bonds
404# Filter topology atoms
405# DANI: Note that a TPR file is not a structure but a topology
406# DANI: However it is important that the argument is called 'structure' for the format finder
407def filter_tpr (
408 input_structure_file : 'File',
409 output_structure_file : 'File',
410 input_selection : 'Selection'
411):
412 # Generate a ndx file with the desired selection
413 filter_selection_name = 'filter'
414 filter_index_content = input_selection.to_ndx(selection_name=filter_selection_name)
415 filter_index_filename = '.filter.ndx'
416 with open(filter_index_filename, 'w') as file:
417 file.write(filter_index_content)
419 # Filter the tpr
420 run_gromacs(f'convert-tpr -s {input_structure_file.path} -o {output_structure_file.path} \
421 -n {filter_index_filename}', user_input = filter_selection_name)
423 # Cleanup the index file
424 remove(filter_index_filename)
426filter_tpr.format_sets = [
427 {
428 'inputs': {
429 'input_structure_file': {'tpr'},
430 },
431 'outputs': {
432 'output_structure_file': {'tpr'}
433 }
434 }
435]
437# Join xtc files
438# This is a minimal implementation of 'gmx trjcat' used in loops
439def merge_xtc_files (current_file : str, new_file : str):
440 # If the current file does nt exist then set the new file as the current file
441 if not exists(current_file):
442 rename(new_file, current_file)
443 return
444 # Run trjcat
445 run_gromacs(f'trjcat -f {new_file.path} {current_file} -o {current_file.path}')
447# Generate a ndx file with a selection of frames
448def generate_frames_ndx (frames : list[int], filename : str):
449 # Add a header
450 content = '[ frames ]\n'
451 count = 0
452 for frame in frames:
453 # Add a breakline each 15 indices
454 count += 1
455 if count == 15:
456 content += '\n'
457 count = 0
458 # Add a space between indices
459 # Atom indices go from 0 to n-1
460 # Add +1 to the index since gromacs counts from 1 to n
461 content += str(frame + 1) + ' '
462 content += '\n'
463 # Write the file
464 with open(filename, 'w') as file:
465 file.write(content)
467# DANI: No se usa, pero me costó un rato ponerla a punto así que la conservo
468# Set a function to read and parse xpm files with a single matrix
469# Inspired in https://gromacswrapper.readthedocs.io/en/latest/_modules/gromacs/fileformats/xpm.html#XPM
470def parse_xpm (filename : str) -> list[ list[float] ]:
471 with open(filename) as file:
472 # First lines include metadata such as the title, description and legend
473 # Read lines until we find the start of the array
474 metadata = [file.readline()]
475 while not metadata[-1].startswith("static char *gromacs_xpm[]"):
476 metadata.append(file.readline())
477 # The next line will contain the dimensions of the matrix
478 # e.g. "7 7 14 1",
479 dimensions = file.readline().replace('"','').replace(',','').split()
480 x_dimension, y_dimension, entity_count, x_stride = [ int(i) for i in dimensions ]
481 # Next lines contain every entity definition
482 # Every entity has the following:
483 # - A letter which is used to refer this entity later in the matrix
484 # - A color in #XXXXXX format
485 # - The actual value of the entity
486 # e.g. "A c #FFFFFF " /* "0" */,
487 # e.g. "B c #EBEBFF " /* "1" */,
488 entities = {}
489 for i in range(entity_count):
490 line = file.readline()
491 entity_id = line[1]
492 entity_color = line[6:13]
493 entity_value = line[18:].split('"')[1]
494 entities[entity_id] = { 'value': entity_value, 'color': entity_color }
495 # Next lines are the matrix axis values
496 x_axis = []
497 y_axis = []
498 # Every line has a maximum of 80 labels
499 x_lines = math.ceil(x_dimension / 80)
500 for l in range(x_lines):
501 line = file.readline()[12:-3].split()
502 x_axis += [ int(v) for v in line ]
503 y_lines = math.ceil(y_dimension / 80)
504 for l in range(y_lines):
505 line = file.readline()[12:-3].split()
506 y_axis += [ int(v) for v in line ]
507 # Next lines are the matrix rows
508 matrix = []
509 for l in range(y_dimension):
510 line = file.readline()[1:1+x_dimension]
511 row = [ letter for letter in line ]
512 matrix.append(row)
513 # Check the final matrix size is as expected
514 if len(matrix) != y_dimension:
515 raise ValueError('Different number of rows than expected')
516 sample_row = matrix[-1]
517 if len(sample_row) != x_dimension:
518 raise ValueError('Different number of columns than expected')
519 # Return the output
520 return { 'entities': entities, 'x_axis': x_axis, 'y_axis': y_axis, 'matrix': matrix }
522# Filter atoms in a pdb file
523# This method conserves maximum resolution and chains
524def pdb_filter (
525 input_pdb_filepath : str,
526 output_pdb_filepath : str,
527 index_filepath : str,
528 filter_group_name : str
529):
530 # Filter the PDB
531 run_gromacs(f'editconf -f {input_pdb_filepath} -o {output_pdb_filepath} \
532 -n {index_filepath}', user_input = filter_group_name)
534# Filter atoms in a xtc file
535# Note that here we do not hide the stderr
536# This is because it shows the progress
537# Instead we color the output grey
538def xtc_filter(
539 structure_filepath : str,
540 input_trajectory_filepath : str,
541 output_trajectory_filepath : str,
542 index_filepath : str,
543 filter_group_name : str
544):
545 # Filter the trajectory
546 run_gromacs(f'trjconv -s {structure_filepath} -f {input_trajectory_filepath} \
547 -o {output_trajectory_filepath} -n {index_filepath}', user_input = filter_group_name)
549# Filter atoms in both a pdb and a xtc file
550def tpr_filter(
551 input_tpr_filepath : str,
552 output_tpr_filepath : str,
553 index_filepath : str,
554 filter_group_name : str
555):
556 # Filter the topology
557 run_gromacs(f'convert-tpr -s {input_tpr_filepath} -o {output_tpr_filepath} \
558 -n {index_filepath}', user_input = filter_group_name)
560# Create a .ndx file from a complex mask
561# e.g. no water and no ions -> !"Water"&!"Ion"
562# This will return the group name to be further used
563def make_index (input_structure_file : 'File', output_index_file : 'File', mask : str) -> str:
564 # Make sure the
565 #if output_index_file.exists: output_index_file.remove()
566 # Run Gromacs
567 run_gromacs(f'make_ndx -f {input_structure_file.path} -o {output_index_file.path}',
568 user_input = f'{mask} \nq')
569 # The group name is automatically assigned by gromacs
570 # It equals the mask but removing any " symbols and with a few additional changes
571 # WARNING: Not all mask symbols are supported here, beware of complex masks
572 group_name = mask.replace('"','').replace('&','_&_').replace('|','_')
573 # Check if the group was created
574 content = read_ndx(output_index_file)
575 created = group_name in content
576 return group_name, created
578# Read a .ndx file
579def read_ndx (input_index_file : 'File') -> dict:
580 # Make sure the file exists
581 if not input_index_file.exists:
582 raise RuntimeError(f'Index file {input_index_file.path} does not exist')
583 # Read the ndx file
584 ndx_content = None
585 with open(input_index_file.path, 'r') as file:
586 ndx_content = file.readlines()
587 # Parse its contents
588 groups = {}
589 current_group_name = None
590 current_group_indices = None
591 for line in ndx_content:
592 # If it is the header of a group of indices
593 if line[0] == '[':
594 # Add current group to the final dict
595 if current_group_name != None:
596 groups[current_group_name] = current_group_indices
597 # Set the name and the list of indices for the next group
598 current_group_name = line[2:-3]
599 current_group_indices = []
600 continue
601 # If it is a line of indices then parse it to actual numbers
602 # Substract 1 since gromacs indices are 1-based while we want them 0-based
603 current_group_indices += [ int(index) - 1 for index in line.split() ]
604 # Add last group to the final dict
605 groups[current_group_name] = current_group_indices
606 # Finally return the parsed content
607 return groups
609# Set a function to count atoms in a gromacs supported file
610ATOMS_LINE = r'\n# Atoms\s*([0-9]*)\n'
611def get_atom_count (mysterious_file : 'File') -> int:
612 output_logs, error_logs = run_gromacs(f'check -f {mysterious_file.path}')
613 search_results = search(ATOMS_LINE, error_logs)
614 if not search_results: raise ValueError('Failed to mine atoms')
615 return int(search_results[1])