Coverage for model_workflow/utils/gmx_spells.py: 55%
264 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
1from os import remove, rename
2from os.path import exists
3from shutil import copyfile
4from subprocess import run, PIPE, Popen
5from re import search
7from model_workflow.utils.constants import GROMACS_EXECUTABLE, GREY_HEADER, COLOR_END
8from model_workflow.utils.file import File
9from model_workflow.utils.type_hints import *
11from model_workflow.tools.fix_gromacs_masses import fix_gromacs_masses
13# Run a fix for gromacs if not done before
14# Note that this is run always at the moment the code is read, no matter the command or calling origin
15fix_gromacs_masses()
17# Get the first frame from a trajectory
18def get_first_frame (input_structure_filename : str, input_trajectory_filename : str, output_frame_filename : str):
19 # Run Gromacs
20 if input_structure_filename:
21 p = Popen([
22 "echo",
23 "System",
24 ], stdout=PIPE)
25 process = run([
26 GROMACS_EXECUTABLE,
27 "trjconv",
28 "-s",
29 input_structure_filename,
30 "-f",
31 input_trajectory_filename,
32 "-o",
33 output_frame_filename,
34 "-dump",
35 "0",
36 "-quiet"
37 ], stdin=p.stdout, stdout=PIPE, stderr=PIPE)
38 else:
39 process = run([
40 GROMACS_EXECUTABLE,
41 "trjconv",
42 "-f",
43 input_trajectory_filename,
44 "-o",
45 output_frame_filename,
46 "-dump",
47 "0",
48 "-quiet"
49 ], stdout=PIPE, stderr=PIPE)
50 # Make the process run as logs are saved and decoded
51 logs = process.stdout.decode()
52 # If output has not been generated then warn the user
53 if not exists(output_frame_filename):
54 print(logs)
55 error_logs = process.stderr.decode()
56 print(error_logs)
57 raise SystemExit('Something went wrong with Gromacs')
59# Set function supported formats
60get_first_frame.format_sets = [
61 {
62 'inputs': {
63 'input_structure_filename': {'tpr', 'pdb', 'gro'},
64 'input_trajectory_filename': {'xtc', 'trr'}
65 },
66 'outputs': {
67 'output_frame_filename': {'pdb', 'gro'}
68 }
69 },
70 {
71 'inputs': {
72 'input_structure_filename': None,
73 'input_trajectory_filename': {'xtc', 'trr'}
74 },
75 'outputs': {
76 'output_frame_filename': {'xtc', 'trr'}
77 }
78 },
79 {
80 'inputs': {
81 'input_structure_filename': None,
82 'input_trajectory_filename': {'pdb'}
83 },
84 'outputs': {
85 'output_frame_filename': {'pdb', 'xtc', 'trr'}
86 }
87 },
88 {
89 'inputs': {
90 'input_structure_filename': None,
91 'input_trajectory_filename': {'gro'}
92 },
93 'outputs': {
94 'output_frame_filename': {'gro', 'xtc', 'trr'}
95 }
96 }
97]
99# Get the structure using the first frame getter function
100def get_structure (input_structure_filename : str, input_trajectory_filename : str, output_structure_filename : str):
101 get_first_frame(input_structure_filename, input_trajectory_filename, output_structure_filename)
102get_structure.format_sets = [
103 {
104 'inputs': {
105 'input_structure_filename': {'tpr', 'pdb', 'gro'},
106 'input_trajectory_filename': {'xtc', 'trr'}
107 },
108 'outputs': {
109 'output_structure_filename': {'pdb', 'gro'}
110 }
111 }
112]
114# Convert the structure using the first frame getter function (no trajectory is required)
115def get_structure_alone (input_structure_filename : str, output_structure_filename : str):
116 get_first_frame(input_structure_filename, input_structure_filename, output_structure_filename)
117get_structure_alone.format_sets = [
118 {
119 'inputs': {
120 'input_structure_filename': {'pdb', 'gro'},
121 },
122 'outputs': {
123 'output_structure_filename': {'pdb', 'gro'}
124 }
125 }
126]
128# Get gromacs supported trajectories merged and converted to a different format
129def merge_and_convert_trajectories (input_trajectory_filenames : List[str], output_trajectory_filename : str):
130 # Get trajectory formats
131 sample_trajectory = input_trajectory_filenames[0]
132 sample_trajectory_file = File(sample_trajectory)
133 input_trajectories_format = sample_trajectory_file.format
134 output_trajectory_file = File(output_trajectory_filename)
135 output_trajectory_format = output_trajectory_file.format
136 auxiliar_single_trajectory_filename = '.single_trajectory.' + input_trajectories_format
137 # If we have multiple trajectories then join them
138 if len(input_trajectory_filenames) > 1:
139 single_trajectory_filename = auxiliar_single_trajectory_filename
140 logs = run([
141 GROMACS_EXECUTABLE,
142 "trjcat",
143 "-f",
144 *input_trajectory_filenames,
145 "-o",
146 single_trajectory_filename,
147 "-quiet"
148 ], stderr=PIPE).stderr.decode()
149 # If output has not been generated then warn the user
150 if not exists(single_trajectory_filename):
151 print(logs)
152 raise SystemExit('Something went wrong with Gromacs')
153 else:
154 single_trajectory_filename = sample_trajectory
155 # In case input and output formats are different we must convert the trajectory
156 if input_trajectories_format != output_trajectory_format:
157 logs = run([
158 GROMACS_EXECUTABLE,
159 "trjconv",
160 "-f",
161 single_trajectory_filename,
162 "-o",
163 output_trajectory_filename,
164 "-quiet"
165 ], stderr=PIPE).stderr.decode()
166 # If output has not been generated then warn the user
167 if not exists(output_trajectory_filename):
168 print(logs)
169 raise SystemExit('Something went wrong with Gromacs')
170 else:
171 copyfile(single_trajectory_filename, output_trajectory_filename)
172 # Remove residual files
173 if exists(auxiliar_single_trajectory_filename):
174 remove(auxiliar_single_trajectory_filename)
176merge_and_convert_trajectories.format_sets = [
177 {
178 'inputs': {
179 'input_trajectory_filenames': {'xtc', 'trr'}
180 },
181 'outputs': {
182 'output_trajectory_filename': {'xtc', 'trr'}
183 }
184 },
185 {
186 'inputs': {
187 'input_trajectory_filenames': {'pdb'}
188 },
189 'outputs': {
190 'output_trajectory_filename': {'pdb', 'xtc', 'trr'}
191 }
192 },
193 {
194 'inputs': {
195 'input_trajectory_filenames': {'gro'}
196 },
197 'outputs': {
198 'output_trajectory_filename': {'gro', 'xtc', 'trr'}
199 }
200 }
201]
203# Get specific frames from a trajectory
204def get_trajectory_subset (
205 input_trajectory_filename : str,
206 output_trajectory_filename : str,
207 start : int = 0,
208 end : int = None,
209 step : int = 1,
210 frames : List[int] = [],
211 skip : List[int] = []
212):
213 # Set a list with frame indices from
214 output_frames = frames if frames and len(frames) > 0 else [ frame for frame in range(start, end, step) if frame not in skip ]
216 # Generate the ndx file to target the desired frames
217 auxiliar_ndx_filename = '.frames.ndx'
218 generate_frames_ndx(output_frames, auxiliar_ndx_filename)
220 # Now run gromacs trjconv command in order to extract the desired frames
221 p = Popen([
222 "echo",
223 "System",
224 ], stdout=PIPE)
225 logs = run([
226 GROMACS_EXECUTABLE,
227 "trjconv",
228 "-f",
229 input_trajectory_filename,
230 "-o",
231 output_trajectory_filename,
232 "-fr",
233 auxiliar_ndx_filename,
234 "-quiet"
235 ], stdin=p.stdout, stdout=PIPE).stdout.decode()
236 p.stdout.close()
238 # If output has not been generated then warn the user
239 if not exists(output_trajectory_filename):
240 print(logs)
241 raise SystemExit('Something went wrong with Gromacs (main conversion)')
243 # Cleanup the auxiliar ndx file
244 remove(auxiliar_ndx_filename)
247get_trajectory_subset.format_sets = [
248 {
249 'inputs': {
250 'input_trajectory_filename': {'xtc', 'trr'}
251 },
252 'outputs': {
253 'output_trajectory_filename': {'xtc', 'trr'}
254 }
255 }
256]
258# Filter trajectory atoms
259def filter_structure (
260 input_structure_file : 'File',
261 output_structure_file : 'File',
262 input_selection : 'Selection'
263):
264 # Generate a ndx file with the desired selection
265 filter_selection_name = 'filter'
266 filter_index_content = input_selection.to_ndx(selection_name=filter_selection_name)
267 filter_index_filename = '.filter.ndx'
268 with open(filter_index_filename, 'w') as file:
269 file.write(filter_index_content)
271 # Filter the structure
272 print(GREY_HEADER, end='\r')
273 p = Popen([
274 "echo",
275 filter_selection_name,
276 ], stdout=PIPE)
277 logs = run([
278 GROMACS_EXECUTABLE,
279 "editconf",
280 "-f",
281 input_structure_file.path,
282 '-o',
283 output_structure_file.path,
284 '-n',
285 filter_index_filename,
286 '-quiet'
287 ], stdin=p.stdout, stdout=PIPE).stdout.decode()
288 p.stdout.close()
289 print(COLOR_END, end='\r')
291 # Check the output file exists at this point
292 # If not then it means something went wrong with gromacs
293 if not output_structure_file.exists:
294 print(logs)
295 raise SystemExit('Something went wrong with Gromacs')
297 # Cleanup the index file
298 remove(filter_index_filename)
300filter_structure.format_sets = [
301 {
302 'inputs': {
303 'input_structure_file': {'pdb', 'gro'},
304 },
305 'outputs': {
306 'output_structure_file': {'pdb', 'gro'}
307 }
308 }
309]
311# Filter trajectory atoms
312def filter_trajectory (
313 input_structure_file : 'File',
314 input_trajectory_file : 'File',
315 output_trajectory_file : 'File',
316 input_selection : 'Selection'
317):
318 # Generate a ndx file with the desired selection
319 filter_selection_name = 'filter'
320 filter_index_content = input_selection.to_ndx(selection_name=filter_selection_name)
321 filter_index_filename = '.filter.ndx'
322 with open(filter_index_filename, 'w') as file:
323 file.write(filter_index_content)
325 # Filter the trajectory
326 print(GREY_HEADER, end='\r')
327 p = Popen([
328 "echo",
329 filter_selection_name,
330 ], stdout=PIPE)
331 logs = run([
332 GROMACS_EXECUTABLE,
333 "trjconv",
334 "-s",
335 input_structure_file.path,
336 "-f",
337 input_trajectory_file.path,
338 '-o',
339 output_trajectory_file.path,
340 '-n',
341 filter_index_filename,
342 '-quiet'
343 ], stdin=p.stdout, stdout=PIPE).stdout.decode()
344 p.stdout.close()
345 print(COLOR_END, end='\r')
347 # Check the output file exists at this point
348 # If not then it means something went wrong with gromacs
349 if not output_trajectory_file.exists:
350 print(logs)
351 raise SystemExit('Something went wrong with Gromacs')
353 # Cleanup the index file
354 remove(filter_index_filename)
356filter_trajectory.format_sets = [
357 {
358 'inputs': {
359 'input_structure_file': {'tpr', 'pdb', 'gro'},
360 'input_trajectory_file': {'xtc', 'trr'}
361 },
362 'outputs': {
363 'output_trajectory_file': {'xtc', 'trr'}
364 }
365 }
366]
368# Set a regular expression to further mine data from gromacs logs
369GROMACS_SYSTEM_ATOMS_REGEX = r'System\) has[ ]+([0-9]*) elements'
371# Mine system atoms count from gromacs logs
372def mine_system_atoms_count (logs : str) -> int:
373 system_atoms_match = search(GROMACS_SYSTEM_ATOMS_REGEX, logs)
374 if not system_atoms_match:
375 print(logs)
376 raise ValueError('Failed to mine Gromacs error logs')
377 return int(system_atoms_match[1])
379# Count TPR atoms
380def get_tpr_atom_count (tpr_filepath : str) -> int:
381 # Make sure the filepath is valid
382 if not exists(tpr_filepath):
383 raise ValueError('Trying to count atoms from a topology which does not exist')
384 # Run Gromacs only to see the number of atoms in the TPR
385 p = Popen([ "echo", "whatever" ], stdout=PIPE)
386 process = run([
387 GROMACS_EXECUTABLE,
388 "convert-tpr",
389 "-s",
390 tpr_filepath,
391 '-quiet'
392 ], stdin=p.stdout, stdout=PIPE, stderr=PIPE)
393 error_logs = process.stderr.decode()
394 p.stdout.close()
395 # Mine the number of atoms in the system from the logs
396 atom_count = mine_system_atoms_count(error_logs)
397 return atom_count
399# Read a tpr file by converting it to ASCII
400def get_tpr_content (tpr_filepath : str) -> Tuple[str, str]:
401 # Read the tpr file making a 'dump'
402 process = run([
403 GROMACS_EXECUTABLE,
404 "dump",
405 "-s",
406 tpr_filepath,
407 "-quiet"
408 ], stdout=PIPE, stderr=PIPE)
409 return process.stdout.decode(), process.stderr.decode()
411# Regular expresion to mine atom charges
412GROMACS_TPR_ATOM_CHARGES_REGEX = r"q=([0-9e+-. ]*),"
414# Get tpr atom charges
415# This works for the new tpr format (tested in 122)
416def get_tpr_charges (tpr_filepath : str) -> List[float]:
417 # Read the TPR
418 tpr_content, tpr_error_logs = get_tpr_content(tpr_filepath)
419 # Mine the atomic charges
420 charges = []
421 # Iterate tpr content lines
422 for line in tpr_content.split('\n'):
423 # Skip everything which is not atomic charges data
424 if line[0:16] != ' atom': continue
425 # Parse the line to get only charges
426 match = search(GROMACS_TPR_ATOM_CHARGES_REGEX, line)
427 if match: charges.append(float(match[1]))
428 # If we successfully got atom charges then return them
429 if len(charges) > 0: return charges
430 # If there are no charges at the end then something went wrong
431 print(tpr_error_logs)
432 raise RuntimeError(f'Charges extraction from tpr file "{tpr_filepath}" has failed')
434# Regular expresion to mine atom bonds
435GROMACS_TPR_ATOM_BONDS_REGEX = r"^\s*([0-9]*) type=[0-9]* \((BONDS|CONSTR|CONNBONDS)\)\s*([0-9]*)\s*([0-9]*)$"
436# Set a regular expression for SETTLE bonds, used for rigid waters
437# ---- From the paper ------------------------------------------------------------------------------------
438# The SETTLE can be applied to a four-point water model like TIP4P5 which has the fourth point with
439# a certain charge and no mass if the force acting on the fourth point is distributed onto the other three
440# points with masses in a reasonable manner.
441# S. Miyamoto and P.A. Kollman, “SETTLE: An analytical version of the SHAKE and RATTLE algorithms for rigid
442# water models,” J. Comp. Chem., 13 952–962 (1992)
443# --------------------------------------------------------------------------------------------------------
444# So it may happen that we encounter SETTLE with 4 atoms
445# This is not yet supported, but at least we check if this is happening to raise an error when found
446GROMACS_TPR_SETTLE_REGEX = r"^\s*([0-9]*) type=[0-9]* \(SETTLE\)\s*([0-9]*)\s*([0-9]*)\s*([0-9]*)\s*([0-9]*)$"
448# Get tpr atom bonds
449def get_tpr_bonds (tpr_filepath : str) -> List[ Tuple[int, int] ]:
450 # Read the TPR
451 tpr_content, tpr_error_logs = get_tpr_content(tpr_filepath)
452 lines = tpr_content.split('\n')
453 # Mine the atomic bonds
454 bonds = []
455 # Save the moment we already processed a bond of each class (e.g. bonds, constr, connbonds, etc.)
456 processed_bond_classes = set()
457 # Save already processed bond classes which have an index restart
458 # This is a coomon problem in TPR bonding, we do not know the meaning but bonds after the index restart are always wrong
459 restarted_bond_classes = set()
460 # Iterate tpr content lines to find bonds
461 for line in lines:
462 # Parse the line to get only atomic bonds
463 match = search(GROMACS_TPR_ATOM_BONDS_REGEX, line)
464 # If this is not a line with bonding data then skip it
465 if not match: continue
466 # Mine bond index and class
467 bond_index = int(match[1])
468 bond_class = match[2]
469 # If this class has a restart then skip it
470 if bond_class in restarted_bond_classes: continue
471 # If the index of the bond is 0 but we already processed bonds from this classes then this is a restart
472 if bond_index == 0 and bond_class in processed_bond_classes:
473 # If not, then add the class to the restarted classes
474 restarted_bond_classes.add(bond_class)
475 # And skip it
476 continue
477 # Add the class to the list
478 processed_bond_classes.add(bond_class)
479 # Get atom indices of bonded atoms
480 bond = ( int(match[3]), int(match[4]) )
481 bonds.append(bond)
482 # Iterate bonds again to find water bonds with the SETTLE algorithm
483 for line in lines:
484 # Parse the line to get only atomic bonds
485 match = search(GROMACS_TPR_SETTLE_REGEX, line)
486 # If this is not a line with bonding data then skip it
487 if not match: continue
488 # If there is a 4rth atom we stop here
489 if match[5]: raise ValueError('Found SETTLE bonds with 4 atoms. This is not yet supported')
490 # Mine the settle
491 settle = ( int(match[2]), int(match[3]), int(match[4]) )
492 # Set actual bonds from the settles
493 # The oxygen is always declared first
494 # DANI: De esto no estoy 100% seguro, pero Miłosz me dijo que probablemente es así
495 # DANI: Si no se cumple, el test de coherent bonds enconrará un hidrógeno con 2 enlaces
496 bonds += [ (settle[0], settle[1]), (settle[0], settle[2]) ]
497 # If we successfully got atom bonds then return them
498 if len(bonds) > 0: return bonds
499 # If there are no bonds at the end then something went wrong
500 print(tpr_error_logs)
501 raise RuntimeError(f'Bonds extraction from tpr file "{tpr_filepath}" has failed')
503# Filter topology atoms
504# DANI: Note that a TPR file is not a structure but a topology
505# DANI: However it is important that the argument is called 'structure' for the format finder
506def filter_tpr (
507 input_structure_file : 'File',
508 output_structure_file : 'File',
509 input_selection : 'Selection'
510):
511 # Generate a ndx file with the desired selection
512 filter_selection_name = 'filter'
513 filter_index_content = input_selection.to_ndx(selection_name=filter_selection_name)
514 filter_index_filename = '.filter.ndx'
515 with open(filter_index_filename, 'w') as file:
516 file.write(filter_index_content)
518 # Filter the tpr
519 p = Popen([
520 "echo",
521 filter_selection_name,
522 ], stdout=PIPE)
523 logs = run([
524 GROMACS_EXECUTABLE,
525 "convert-tpr",
526 "-s",
527 input_structure_file.path,
528 '-o',
529 output_structure_file.path,
530 '-n',
531 filter_index_filename,
532 '-quiet'
533 ], stdin=p.stdout, stdout=PIPE).stdout.decode()
534 p.stdout.close()
536 # Check the output file exists at this point
537 # If not then it means something went wrong with gromacs
538 if not output_structure_file.exists:
539 print(logs)
540 raise SystemExit('Something went wrong with Gromacs')
542 # Cleanup the index file
543 remove(filter_index_filename)
545filter_tpr.format_sets = [
546 {
547 'inputs': {
548 'input_structure_file': {'tpr'},
549 },
550 'outputs': {
551 'output_structure_file': {'tpr'}
552 }
553 }
554]
556# Join xtc files
557# This is a minimal implementation of 'gmx trjcat' used in loops
558def merge_xtc_files (current_file : str, new_file : str):
559 # If the current file does nt exist then set the new file as the current file
560 if not exists(current_file):
561 rename(new_file, current_file)
562 return
563 # Run trjcat
564 logs = run([
565 GROMACS_EXECUTABLE,
566 "trjcat",
567 "-f",
568 new_file,
569 current_file,
570 '-o',
571 current_file,
572 '-quiet'
573 ],
574 stdout=PIPE,
575 stderr=PIPE
576 ).stdout.decode()
578# Generate a ndx file with a selection of frames
579def generate_frames_ndx (frames : List[int], filename : str):
580 # Add a header
581 content = '[ frames ]\n'
582 count = 0
583 for frame in frames:
584 # Add a breakline each 15 indices
585 count += 1
586 if count == 15:
587 content += '\n'
588 count = 0
589 # Add a space between indices
590 # Atom indices go from 0 to n-1
591 # Add +1 to the index since gromacs counts from 1 to n
592 content += str(frame + 1) + ' '
593 content += '\n'
594 # Write the file
595 with open(filename, 'w') as file:
596 file.write(content)
598# DANI: No se usa, pero me costó un rato ponerla a punto así que la conservo
599# Set a function to read and parse xpm files with a single matrix
600# Inspired in https://gromacswrapper.readthedocs.io/en/latest/_modules/gromacs/fileformats/xpm.html#XPM
601def parse_xpm (filename : str) -> List[ List[float] ]:
602 with open(filename) as file:
603 # First lines include metadata such as the title, description and legend
604 # Read lines until we find the start of the array
605 metadata = [file.readline()]
606 while not metadata[-1].startswith("static char *gromacs_xpm[]"):
607 metadata.append(file.readline())
608 # The next line will contain the dimensions of the matrix
609 # e.g. "7 7 14 1",
610 dimensions = file.readline().replace('"','').replace(',','').split()
611 x_dimension, y_dimension, entity_count, x_stride = [ int(i) for i in dimensions ]
612 # Next lines contain every entity definition
613 # Every entity has the following:
614 # - A letter which is used to refer this entity later in the matrix
615 # - A color in #XXXXXX format
616 # - The actual value of the entity
617 # e.g. "A c #FFFFFF " /* "0" */,
618 # e.g. "B c #EBEBFF " /* "1" */,
619 entities = {}
620 for i in range(entity_count):
621 line = file.readline()
622 entity_id = line[1]
623 entity_color = line[6:13]
624 entity_value = line[18:].split('"')[1]
625 entities[entity_id] = { 'value': entity_value, 'color': entity_color }
626 # Next lines are the matrix axis values
627 x_axis = []
628 y_axis = []
629 # Every line has a maximum of 80 labels
630 x_lines = math.ceil(x_dimension / 80)
631 for l in range(x_lines):
632 line = file.readline()[12:-3].split()
633 x_axis += [ int(v) for v in line ]
634 y_lines = math.ceil(y_dimension / 80)
635 for l in range(y_lines):
636 line = file.readline()[12:-3].split()
637 y_axis += [ int(v) for v in line ]
638 # Next lines are the matrix rows
639 matrix = []
640 for l in range(y_dimension):
641 line = file.readline()[1:1+x_dimension]
642 row = [ letter for letter in line ]
643 matrix.append(row)
644 # Check the final matrix size is as expected
645 if len(matrix) != y_dimension:
646 raise ValueError('Different number of rows than expected')
647 sample_row = matrix[-1]
648 if len(sample_row) != x_dimension:
649 raise ValueError('Different number of columns than expected')
650 # Return the output
651 return { 'entities': entities, 'x_axis': x_axis, 'y_axis': y_axis, 'matrix': matrix }
653# Filter atoms in a pdb file
654# This method conserves maximum resolution and chains
655def pdb_filter (
656 input_pdb_filepath : str,
657 output_pdb_filepath : str,
658 index_filepath : str,
659 filter_group_name : str
660):
661 # Filter the trajectory
662 p = Popen([
663 "echo",
664 filter_group_name,
665 ], stdout=PIPE)
666 process = run([
667 GROMACS_EXECUTABLE,
668 "editconf",
669 "-f",
670 input_pdb_filepath,
671 '-o',
672 output_pdb_filepath,
673 '-n',
674 index_filepath,
675 '-quiet'
676 ], stdin=p.stdout, stdout=PIPE, stderr=PIPE)
677 logs = process.stdout.decode()
678 p.stdout.close()
679 # If output has not been generated then warn the user
680 if not exists(output_pdb_filepath):
681 print(logs)
682 error_logs = process.stderr.decode()
683 print(error_logs)
684 raise SystemExit('Something went wrong with Gromacs while filtering PDB')
686# Filter atoms in a xtc file
687# Note that here we do not hide the stderr
688# This is because it shows the progress
689# Instead we color the output grey
690def xtc_filter(
691 structure_filepath : str,
692 input_trajectory_filepath : str,
693 output_trajectory_filepath : str,
694 index_filepath : str,
695 filter_group_name : str
696):
697 print(GREY_HEADER)
698 # Filter the trajectory
699 p = Popen([
700 "echo",
701 filter_group_name,
702 ], stdout=PIPE)
703 process = run([
704 GROMACS_EXECUTABLE,
705 "trjconv",
706 "-s",
707 structure_filepath,
708 "-f",
709 input_trajectory_filepath,
710 '-o',
711 output_trajectory_filepath,
712 '-n',
713 index_filepath,
714 '-quiet'
715 ], stdin=p.stdout, stdout=PIPE)
716 logs = process.stdout.decode()
717 p.stdout.close()
718 print(COLOR_END)
719 # If output has not been generated then warn the user
720 if not exists(output_trajectory_filepath):
721 print(logs)
722 error_logs = process.stderr.decode()
723 print(error_logs)
724 raise SystemExit('Something went wrong with Gromacs while filtering XTC')
726# Filter atoms in both a pdb and a xtc file
727def tpr_filter(
728 input_tpr_filepath : str,
729 output_tpr_filepath : str,
730 index_filepath : str,
731 filter_group_name : str
732):
733 # Filter the topology
734 p = Popen([
735 "echo",
736 filter_group_name,
737 ], stdout=PIPE)
738 process = run([
739 GROMACS_EXECUTABLE,
740 "convert-tpr",
741 "-s",
742 input_tpr_filepath,
743 '-o',
744 output_tpr_filepath,
745 '-n',
746 index_filepath,
747 '-quiet'
748 ], stdin=p.stdout, stdout=PIPE, stderr=PIPE)
749 logs = process.stdout.decode()
750 p.stdout.close()
751 # If output has not been generated then warn the user
752 if not exists(output_tpr_filepath):
753 print(logs)
754 error_logs = process.stderr.decode()
755 print(error_logs)
756 raise SystemExit('Something went wrong with Gromacs while filtering TPR')
758# Create a .ndx file from a complex mask
759# e.g. no water and no ions -> !"Water"&!"Ion"
760# This will return the group name to be further used
761def make_index (input_structure_file : 'File', output_index_file : 'File', mask : str) -> str:
762 # Run Gromacs
763 p = Popen([
764 "echo",
765 "-e", # Allows the interpretation of '\'
766 mask,
767 '\nq' # Breakline + Save
768 ], stdout=PIPE)
769 process = run([
770 GROMACS_EXECUTABLE,
771 "make_ndx",
772 "-f",
773 input_structure_file.path,
774 '-o',
775 output_index_file.path
776 ], stdin=p.stdout, stdout=PIPE, stderr=PIPE)
777 logs = process.stdout.decode()
778 p.stdout.close()
779 # Check the output file exists at this point
780 # If not then it means something went wrong with gromacs
781 if not output_index_file.exists:
782 print(logs)
783 error_logs = process.stderr.decode()
784 print(error_logs)
785 raise SystemExit('Something went wrong with Gromacs when creating index file')
786 # The group name is automatically assigned by gromacs
787 # It equals the mask but removing andy " symbol
788 group_name = mask.replace('"','')
789 return group_name