Coverage for mddb_workflow / utils / filters.py: 60%
86 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 18:45 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 18:45 +0000
1from mddb_workflow.utils.auxiliar import InputError
2from mddb_workflow.utils.formats import get_format_set_suitable_function
3from mddb_workflow.utils.gmx_spells import filter_structure, filter_trajectory, filter_tpr
4from mddb_workflow.utils.pyt_spells import filter_topology
5from mddb_workflow.utils.structures import Structure
6from mddb_workflow.utils.type_hints import *
7from mddb_workflow.utils.file import File
8from mddb_workflow.tools.conversions import convert
9from inspect import getfullargspec
11# Set functions to performe structure conversions
12# These functions must have 'input_structure_file' and 'output_structure_file' keywords
13# These functions must have the 'format_sets' property
14# These functions may have the 'input_trajectory_file' keyword
15structure_filtering_functions = [ filter_structure, filter_tpr, filter_topology ]
17# Set functions to performe trajectory conversions
18# These functions must have 'input_trajectory_file' and 'output_trajectory_file' keywords
19# These functions must have the 'format_sets' property
20trajectory_filtering_functions = [ filter_trajectory ]
22# Auxiliar PDB file used to instatiate the reference structure sometimes
23AUXILIAR_PDB_FILE = File('.auxiliar.pdb')
25# Handle filterings of different structure and trajectory formats
26def filter_atoms (
27 input_structure_file : Optional['File'] = None,
28 output_structure_file : Optional['File'] = None,
29 input_trajectory_file : Optional['File'] = None,
30 output_trajectory_file : Optional['File'] = None,
31 selection_string : Optional[str] = None,
32 selection_syntax : str = 'vmd'
33):
35 # If we have no output at all then there is nothing to do
36 if not output_structure_file and not output_trajectory_file:
37 raise InputError('No output structure neither output trajectory was specified')
38 # If we have output but not input we must complain
39 if output_structure_file and not input_structure_file:
40 raise InputError('Missing input structure when output structure is requested')
41 if output_trajectory_file and not input_trajectory_file:
42 raise InputError('Missing input trajectory when output trajectory is requested')
44 # Check input files to exist
45 for input_file in [ input_structure_file, input_trajectory_file ]:
46 if input_file and not input_file.exists:
47 raise InputError('Could not find input file ' + input_file.path)
49 # Check we are not missing additional inputs
50 if not selection_string:
51 print('Missing input selection string -> Water and counter ions will be filtered')
52 elif not selection_syntax:
53 raise InputError('Missing input selection syntax')
55 # We need a PDB to instantiate the reference structure
56 # The structure file may also be instantiated from some topology file formats but this is not much reliable
57 # If the input structure is not a PDB then we make a conversion to generate it
58 structure_filepath = input_structure_file.path
59 if input_structure_file.format != 'pdb':
60 # To do so we need coordinates so make sure they were passed
61 if not input_trajectory_file:
62 raise InputError('In order to filter we need coordinates since some atom selections may rely in close atoms.\n' +
63 ' Your input structure is a topology thus not having coordinates.\n' +
64 ' Please provide a trajectory file as well')
65 # Generate a PDB file using both input topology and trajectory
66 structure_filepath = AUXILIAR_PDB_FILE.path
67 convert(input_structure_filepath=input_structure_file.path,
68 output_structure_filepath=structure_filepath,
69 input_trajectory_filepaths=input_trajectory_file.path)
71 # Parse the selection
72 selection = None
73 # Hope the input structure is in a supported format
74 structure = Structure.from_file(structure_filepath)
75 if selection_string:
76 # Parse the input selection
77 selection = structure.select(selection_string, syntax=selection_syntax)
78 # If the selection is empty then war the user
79 if not selection:
80 raise InputError(f'Selection {selection_string} is empty')
81 else:
82 # If the selection is missing then filter out water and ions by default
83 water_selection = structure.select_water()
84 counter_ions_selection = structure.select_counter_ions()
85 filter_selection = water_selection + counter_ions_selection
86 selection = structure.invert_selection(filter_selection)
87 # If the selection is empty then war the user
88 if not selection:
89 raise InputError('There are no water or counter ions')
91 # Remove the axuiliar PDB file, if exists
92 if AUXILIAR_PDB_FILE.exists: AUXILIAR_PDB_FILE.remove()
94 # Check if any input file has an non-standard extension of a supported format
95 # If so then we create a symlink with the standard extension
96 # Save created symlinks to remove them at then of the process
97 symlink_files = []
98 if input_structure_file and input_structure_file.extension != input_structure_file.format:
99 input_structure_file = input_structure_file.get_standard_file()
100 symlink_files.append(input_structure_file)
101 if input_trajectory_file and input_trajectory_file.extension != input_trajectory_file.format:
102 input_trajectory_file = input_trajectory_file.get_standard_file()
103 symlink_files.append(input_trajectory_file)
105 # Filter the structure if an output structure file was provided
106 if output_structure_file:
107 print(f'Filtering structure {input_structure_file.path} to {output_structure_file.path}')
108 # Find a suitable function according to the formats
109 # Note than given the nature of this logic we may encounter a function which converts the file format
110 # This function is not intended for that but this is not a problem either
111 # If the user specifies different input/output formats and the function can do it then go ahead
112 input_trajectory_format = input_trajectory_file.format if input_trajectory_file else None
113 request_format_set = {
114 'inputs': {
115 'input_structure_file': { input_structure_file.format },
116 'input_trajectory_file': { input_trajectory_format }
117 },
118 'outputs': {
119 'output_structure_file': { output_structure_file.format }
120 }
121 }
122 suitable = next(get_format_set_suitable_function(
123 available_functions=structure_filtering_functions,
124 available_request_format_sets=[request_format_set],
125 ), None)
126 # If there is no function to handle this specific conversion we stop here
127 if not suitable:
128 if input_structure_file.format == output_structure_file.format:
129 raise InputError(f'Filtering structure files in {input_structure_file.format} format is not supported')
130 else:
131 raise InputError(f'Filtering structure from {input_structure_file.format} to {output_structure_file.format} is not supported')
132 filtering_function, formats = suitable
133 # Find the function keywords
134 # This is important since some functions may need a trajectory input in addition
135 filtering_function_keywords = getfullargspec(filtering_function)[0]
136 required_trajectory = 'input_trajectory_filename' in filtering_function_keywords
137 # If we need a trajectory then pass it as argument as well
138 if required_trajectory:
139 # Make sure an input trajectory was passed
140 if not input_trajectory_file:
141 raise InputError('The structure input format ' + input_structure_file.format +
142 ' is missing coordinates and the output format ' + output_structure_file.format +
143 ' needs them. An input trajectory file is required.')
144 filtering_function(
145 input_structure_file=input_structure_file,
146 input_trajectory_file=input_trajectory_file,
147 output_structure_file=output_structure_file,
148 input_selection=selection
149 )
150 # Otherwise use just the structure as input
151 else:
152 filtering_function(
153 input_structure_file=input_structure_file,
154 output_structure_file=output_structure_file,
155 input_selection=selection
156 )
158 # Filter the trajectory if an output trajectory file was provided
159 if output_trajectory_file:
160 print(f'Filtering trajectory {input_trajectory_file.path} to {output_trajectory_file.path}')
161 # Otherwise, we must convert
162 # Choose the right conversion function according to input and output formats
163 request_format_set = {
164 'inputs': {
165 'input_structure_file': { input_structure_file.format },
166 'input_trajectory_file': { input_trajectory_file.format }
167 },
168 'outputs': {
169 'output_trajectory_file': { output_trajectory_file.format }
170 }
171 }
172 suitable = next(get_format_set_suitable_function(
173 available_functions=trajectory_filtering_functions,
174 available_request_format_sets=[request_format_set],
175 ), None)
176 # If there is no function to handle the filtering then we stop here
177 if not suitable:
178 if input_trajectory_file.format == output_trajectory_file.format:
179 raise InputError(f'Filtering trajectory files in {input_trajectory_file.format} format is not supported')
180 else:
181 raise InputError(f'Filtering trajectory from {input_trajectory_file.format} to {output_trajectory_file.format} is not supported')
182 filtering_function, formats = suitable
183 # Get the input structure expected format
184 expected_input_structure_formats = formats['inputs'].get('input_structure_file', False)
185 # If the function expects an input structure in any format then pass the structure
186 if expected_input_structure_formats:
187 filtering_function(
188 input_structure_file=input_structure_file,
189 input_trajectory_file=input_trajectory_file,
190 output_trajectory_file=output_trajectory_file,
191 input_selection=selection
192 )
193 # If the function expects an input structure as None then pass None
194 elif expected_input_structure_formats == None:
195 filtering_function(
196 input_structure_file=None,
197 input_trajectory_file=input_trajectory_file,
198 output_trajectory_file=output_trajectory_file,
199 input_selection=selection
200 )
201 # If the function does not expect an input structure then do not pass it
202 else:
203 filtering_function(
204 input_trajectory_file=input_trajectory_file,
205 output_trajectory_file=output_trajectory_file,
206 input_selection=selection
207 )
209 # Remove generated symlinks, if any
210 for symlink_file in symlink_files:
211 symlink_file.remove()