Coverage for model_workflow/utils/filters.py: 60%
85 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
1from model_workflow.utils.auxiliar import InputError
2from model_workflow.utils.formats import get_format_set_suitable_function
3from model_workflow.utils.gmx_spells import filter_structure, filter_trajectory, filter_tpr
4from model_workflow.utils.pyt_spells import filter_topology
5from model_workflow.utils.structures import Structure
6from model_workflow.utils.type_hints import *
7from model_workflow.utils.file import File
8from model_workflow.tools.conversions import convert
9from inspect import getfullargspec
11# Set functions to performe structure conversions
12# These functions must have 'input_structure_file' and 'output_structure_file' keywords
13# These functions must have the 'format_sets' property
14# These functions may have the 'input_trajectory_file' keyword
15structure_filtering_functions = [ filter_structure, filter_tpr, filter_topology ]
17# Set functions to performe trajectory conversions
18# These functions must have 'input_trajectory_file' and 'output_trajectory_file' keywords
19# These functions must have the 'format_sets' property
20trajectory_filtering_functions = [ filter_trajectory ]
22# Auxiliar PDB file used to instatiate the reference structure sometimes
23AUXILIAR_PDB_FILE = File('.auxiliar.pdb')
25# Handle filterings of different structure and trajectory formats
26def filter_atoms (
27 input_structure_file : Optional['File'] = None,
28 output_structure_file : Optional['File'] = None,
29 input_trajectory_file : Optional['File'] = None,
30 output_trajectory_file : Optional['File'] = None,
31 selection_string : Optional[str] = None,
32 selection_syntax : str = 'vmd'
33):
35 # If we have no output at all then there is nothing to do
36 if not output_structure_file and not output_trajectory_file:
37 raise InputError('No output structure neither output trajectory was specified')
38 # If we have output but not input we must complain
39 if output_structure_file and not input_structure_file:
40 raise InputError('Missing input structure when output structure is requested')
41 if output_trajectory_file and not input_trajectory_file:
42 raise InputError('Missing input trajectory when output trajectory is requested')
44 # Check input files to exist
45 for input_file in [ input_structure_file, input_trajectory_file ]:
46 if input_file and not input_file.exists:
47 raise InputError('Could not find input file ' + input_file.path)
49 # Check we are not missing additional inputs
50 if not selection_string:
51 print('Missing input selection string -> Water and counter ions will be filtered')
52 elif not selection_syntax:
53 raise InputError('Missing input selection syntax')
55 # We need a PDB to instantiate the reference structure
56 # The structure file may also be instantiated from some topology file formats but this is not much reliable
57 # If the input structure is not a PDB then we make a conversion to generate it
58 structure_filepath = input_structure_file.path
59 if input_structure_file.format != 'pdb':
60 # To do so we need coordinates so make sure they were passed
61 if not input_trajectory_file:
62 raise InputError('In order to filter we need coordinates since some atom selections may rely in close atoms.\n' +
63 ' Your input structure is a topology thus not having coordinates.\n' +
64 ' Please provide a trajectory file as well')
65 # Generate a PDB file using both input topology and trajectory
66 structure_filepath = AUXILIAR_PDB_FILE.path
67 convert(input_structure_filepath=input_structure_file.path,
68 output_structure_filepath=structure_filepath,
69 input_trajectory_filepaths=input_trajectory_file.path)
71 # Parse the selection
72 selection = None
73 # Hope the input structure is in a supported format
74 structure = Structure.from_file(structure_filepath)
75 if selection_string:
76 # Parse the input selection
77 selection = structure.select(selection_string, syntax=selection_syntax)
78 # If the selection is empty then war the user
79 if not selection:
80 raise InputError(f'Selection {selection_string} is empty')
81 else:
82 # If the selection is missing then filter out water and ions by default
83 water_selection = structure.select_water()
84 counter_ions_selection = structure.select_counter_ions()
85 filter_selection = water_selection + counter_ions_selection
86 selection = structure.invert_selection(filter_selection)
87 # If the selection is empty then war the user
88 if not selection:
89 raise InputError('There are no water or counter ions')
91 # Remove the axuiliar PDB file, if exists
92 if AUXILIAR_PDB_FILE.exists: AUXILIAR_PDB_FILE.remove()
94 # Check if any input file has an non-standard extension of a supported format
95 # If so then we create a symlink with the standard extension
96 # Save created symlinks to remove them at then of the process
97 symlink_files = []
98 if input_structure_file and input_structure_file.extension != input_structure_file.format:
99 input_structure_file = input_structure_file.get_standard_file()
100 symlink_files.append(input_structure_file)
101 if input_trajectory_file and input_trajectory_file.extension != input_trajectory_file.format:
102 input_trajectory_file = input_trajectory_file.get_standard_file()
103 symlink_files.append(input_trajectory_file)
105 # Filter the structure if an output structure file was provided
106 if output_structure_file:
107 print(f'Filtering structure {input_structure_file.path} to {output_structure_file.path}')
108 # Find a suitable function according to the formats
109 # Note than given the nature of this logic we may encounter a function which converts the file format
110 # This function is not intended for that but this is not a problem either
111 # If the user specifies different input/output formats and the function can do it then go ahead
112 request_format_set = {
113 'inputs': {
114 'input_structure_file': { input_structure_file.format },
115 'input_trajectory_file': { input_trajectory_file.format }
116 },
117 'outputs': {
118 'output_structure_file': { output_structure_file.format }
119 }
120 }
121 suitable = next(get_format_set_suitable_function(
122 available_functions=structure_filtering_functions,
123 available_request_format_sets=[request_format_set],
124 ), None)
125 # If there is no function to handle this specific conversion we stop here
126 if not suitable:
127 if input_structure_file.format == output_structure_file.format:
128 raise InputError(f'Filtering structure files in {input_structure_file.format} format is not supported')
129 else:
130 raise InputError(f'Filtering structure from {input_structure_file.format} to {output_structure_file.format} is not supported')
131 filtering_function, formats = suitable
132 # Find the function keywords
133 # This is important since some functions may need a trajectory input in addition
134 filtering_function_keywords = getfullargspec(filtering_function)[0]
135 required_trajectory = 'input_trajectory_filename' in filtering_function_keywords
136 # If we need a trajectory then pass it as argument as well
137 if required_trajectory:
138 # Make sure an input trajectory was passed
139 if not input_trajectory_file:
140 raise InputError('The structure input format ' + input_structure_file.format +
141 ' is missing coordinates and the output format ' + output_structure_file.format +
142 ' needs them. An input trajectory file is required.')
143 filtering_function(
144 input_structure_file=input_structure_file,
145 input_trajectory_file=input_trajectory_file,
146 output_structure_file=output_structure_file,
147 input_selection=selection
148 )
149 # Otherwise use just the structure as input
150 else:
151 filtering_function(
152 input_structure_file=input_structure_file,
153 output_structure_file=output_structure_file,
154 input_selection=selection
155 )
157 # Filter the trajectory if an output trajectory file was provided
158 if output_trajectory_file:
159 print(f'Filtering trajectory {output_trajectory_file.path} to {output_trajectory_file.path}')
160 # Otherwise, we must convert
161 # Choose the right conversion function according to input and output formats
162 request_format_set = {
163 'inputs': {
164 'input_structure_file': { input_structure_file.format },
165 'input_trajectory_file': { input_trajectory_file.format }
166 },
167 'outputs': {
168 'output_trajectory_file': { output_trajectory_file.format }
169 }
170 }
171 suitable = next(get_format_set_suitable_function(
172 available_functions=trajectory_filtering_functions,
173 available_request_format_sets=[request_format_set],
174 ), None)
175 # If there is no function to handle the filtering then we stop here
176 if not suitable:
177 if input_trajectory_file.format == output_trajectory_file.format:
178 raise InputError(f'Filtering trajectory files in {input_trajectory_file.format} format is not supported')
179 else:
180 raise InputError(f'Filtering trajectory from {input_trajectory_file.format} to {output_trajectory_file.format} is not supported')
181 filtering_function, formats = suitable
182 # Get the input structure expected format
183 expected_input_structure_formats = formats['inputs'].get('input_structure_file', False)
184 # If the function expects an input structure in any format then pass the structure
185 if expected_input_structure_formats:
186 filtering_function(
187 input_structure_file=input_structure_file,
188 input_trajectory_file=input_trajectory_file,
189 output_trajectory_file=output_trajectory_file,
190 input_selection=selection
191 )
192 # If the function expects an input structure as None then pass None
193 elif expected_input_structure_formats == None:
194 filtering_function(
195 input_structure_file=None,
196 input_trajectory_file=input_trajectory_file,
197 output_trajectory_file=output_trajectory_file,
198 input_selection=selection
199 )
200 # If the function does not expect an input structure then do not pass it
201 else:
202 filtering_function(
203 input_trajectory_file=input_trajectory_file,
204 output_trajectory_file=output_trajectory_file,
205 input_selection=selection
206 )
208 # Remove generated symlinks, if any
209 for symlink_file in symlink_files:
210 symlink_file.remove()