Coverage for model_workflow/utils/formats.py: 75%
198 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
1import os
2from typing import Optional, List, Tuple, Callable, Generator
3from inspect import getfullargspec
5def get_format (filename : str) -> str:
6 """Get a filename format."""
7 if not filename:
8 return None
9 return filename.split('.')[-1]
11def get_format_set_suitable_function (
12 available_functions : List[Callable],
13 available_request_format_sets : List[dict],
14) -> Generator[ Optional[ Tuple[ Callable, dict ]], None, None ]:
15 """
16 Find a function which is suitable for any of the available request "format sets".
17 All functions are checked for each request format set before jumping to another and they are evaluated in order.
18 A function and new generated format set with formats in common are returned.
19 None is returned when there is no suitable function.
21 WARNING: Available functions must have the 'format_sets' property.
22 Format sets are dictionaries which specify input and output formats.
23 Consider function format sets as 'required input' and 'available output'.
24 Consider request format sets as 'available input' and 'required output'.
25 Both inputs and outputs are dictionaries where keys are function arguments and values are sets of supported formats.
26 Alternatively, an argument may have None as value to represent unnecessary requirements or missing availabilities.
28 An example is shown below:
29 {
30 'inputs': {
31 'input_structure_filename': {'tpr'},
32 'input_trajectory_filenames': {'xtc', 'trr'},
33 },
34 'outputs': {
35 'output_trajectory_filename': {'pdb', 'gro'}
36 },
37 }
39 Functions may have multiple format sets since different input formats may lead to different output formats.
40 """
41 # -------------------------------------------------------------
42 # Use this to see what is going on
43 # print('REQUEST')
44 # print(available_request_format_sets)
45 # print('AVAILABLE FUNCTIONS')
46 # print([ func.format_sets for func in available_functions ])
47 # -------------------------------------------------------------
48 # Try with each request format set
49 for request_format_set in available_request_format_sets:
50 # Search functions to match formats for every required argument
51 for function in available_functions:
52 # Test every function format set independently
53 for function_format_set in function.format_sets:
54 # Check format keys are compatible
55 if not check_format_sets_compability(request_format_set, function_format_set):
56 raise SystemExit('Format keys are not compatible with function ' + str(function.__name__))
57 # Check the function inputs to be fulfilled by the request inputs
58 required_inputs = function_format_set.get('inputs', None)
59 available_inputs = request_format_set.get('inputs', None)
60 common_inputs = get_common_argument_formats(required_inputs, available_inputs)
61 # If any of the common format sets was empty it means formats do not match
62 if not common_inputs:
63 continue
64 # Check the request outputs to be fulfilled by the function outputs
65 required_outputs = request_format_set.get('outputs', None)
66 available_outputs = function_format_set.get('outputs', None)
67 common_outputs = get_common_argument_formats(required_outputs, available_outputs)
68 # If any of the common format sets was empty it means formats do not match
69 if not common_outputs:
70 continue
71 # Generate a new format set with the common formats for every argument
72 common_format_set = { 'inputs': common_inputs, 'outputs': common_outputs }
73 # Otherwise we have the function
74 yield function, common_format_set
76def get_common_argument_formats (required_arguments : dict, available_arguments : dict):
77 """
78 Get compatible formats between two groups of arguments.
79 All required argument formats must be fulfilled by the available argument formats.
80 Arguments are defined as dictionaries with argument names as keys and sets of available formats as values.
81 e.g.
83 {
84 'input_structure_filename': {'tpr'},
85 'input_trajectory_filenames': {'xtc', 'trr'},
86 }
87 """
88 # If there are not required arguments we return an empty dictionary
89 if not required_arguments:
90 return {'not_required'}
91 # If there are not available arguments we return None
92 if not available_arguments:
93 return None
94 # Set a dictionary with the same keys that the input arguments including only the common formats
95 common_argument_formats = {}
96 # Iterate over each required argument
97 for required_argument, required_formats in required_arguments.items():
98 # If there is not format required for this argument we set None for this argument common formats
99 if required_formats == None:
100 common_argument_formats[required_argument] = None
101 continue
102 # Get the available formats for the required input
103 available_formats = available_arguments.get(required_argument, None) # DANI cuidao aquí, que igual el get no hacía falta
104 # If the available arguments are missing this required argument then the function is not compatible
105 if available_formats == None:
106 return None
107 # Find the formats in common between both the required and the available arguments
108 common_formats = available_formats.intersection(required_formats)
109 # If the common formats set is empty for this required argument then the function is not compatible
110 if not common_formats:
111 return None
112 common_argument_formats[required_argument] = common_formats
113 return common_argument_formats
116def check_format_sets_compability (request_format_set : dict, function_format_set : dict) -> bool:
117 """
118 Check two format sets to be compatible.
119 Both function and request format sets must match in their requirements.
120 i.e. all function format set input arguments must be included in request format set input arguments.
121 i.e. all request format set output arguments must be included in function format set output arguments.
122 """
123 # Check the function inputs keyowrds to exist in the request input arguments
124 required_inputs = function_format_set.get('inputs', None)
125 if required_inputs:
126 required_input_arguments = required_inputs.keys()
127 available_inputs = request_format_set.get('inputs', None)
128 if not available_inputs:
129 print('ERROR: Missing inputs')
130 return False
131 available_input_arguments = available_inputs.keys()
132 for argument in required_input_arguments:
133 if argument not in available_input_arguments:
134 print('ERROR: Missing ' + argument + ' argument')
135 return False
136 # Check the request output keyowrds to exist in the function output arguments
137 available_outputs = request_format_set.get('outputs', None)
138 if available_outputs:
139 required_output_arguments = available_outputs.keys()
140 available_outputs = function_format_set.get('outputs', None)
141 if not available_outputs:
142 print('ERROR: Missing outputs')
143 return False
144 available_output_arguments = function_format_set['outputs'].keys()
145 for argument in required_output_arguments:
146 if argument not in available_output_arguments:
147 print('ERROR: Missing ' + argument + ' argument')
148 return False
149 return True
151# WARNING: This function makes only sets for those functions whose output can be reused as input of others
152# WARNING: i.e. functions which return structure/trajectory files
153# WARNING: This function should be called only when "get_format_set_suitable_function" has failed
154def get_format_set_suitable_combination (
155 available_functions : List[Callable],
156 available_request_format_sets : List[dict],
157) -> Optional[ Tuple[ Callable, dict ] ]:
159 # Try with each request format set
160 for request_format_set in available_request_format_sets:
162 # Get the required outputs
163 required_outputs = request_format_set.get('outputs', None)
165 # For each function + format set possibility which is compatible with the required inputs, return the available outputs
166 def get_combinations (
167 current_functions : List[Callable],
168 current_function_common_inputs : List[dict],
169 available_inputs : dict,
170 ):
171 # Search functions to match formats for every required argument
172 for function in available_functions:
173 # Test every function format set independently
174 for function_format_set in function.format_sets:
175 # Check format keys are compatible
176 if not check_format_sets_compability(request_format_set, function_format_set):
177 raise ValueError('Format keys are not compatible with function ' + str(function.__name__))
178 # We add the current function to the list of functions to combine
179 new_functions = [ *current_functions, function ]
180 # Check the function inputs to be fulfilled by the request inputs
181 required_inputs = function_format_set.get('inputs', None)
182 common_inputs = get_common_argument_formats(required_inputs, available_inputs)
183 new_function_common_inputs = [ *current_function_common_inputs, common_inputs ]
184 # If any of the common format sets was empty it means formats do not match
185 if not common_inputs:
186 continue
187 # Check the request outputs to be fulfilled by the function outputs
188 available_outputs = function_format_set.get('outputs', None)
189 common_outputs = get_common_argument_formats(required_outputs, available_outputs)
190 # If any of the common format sets was not empty then we have found a successful combination
191 if common_outputs:
192 # Use this print to display which functions are selected
193 #print([ cf.__name__ + ' from ' + cf.__module__ for cf in new_functions ])
194 yield new_functions, new_function_common_inputs, common_outputs
195 # If we have no common outputs yet we must make another jump
196 # Merge current available inputs with the current function available outputs
197 # The using all these formats as available inputs, try to find another function
198 current_structure_inputs = available_inputs.get('input_structure_filename', set())
199 current_trajectory_inputs = available_inputs.get('input_trajectory_filenames', set())
200 new_structure_inputs = available_outputs.get('output_structure_filename', set())
201 new_trajectory_inputs = available_outputs.get('output_trajectory_filename', set())
202 next_structure_inputs = current_structure_inputs.union(new_structure_inputs)
203 next_trajectory_inputs = current_trajectory_inputs.union(new_trajectory_inputs)
204 # If current function did not add new formats to the already available formats then current step was useless
205 # In this case, stop here
206 if len(next_structure_inputs) == len(current_structure_inputs) and len(next_trajectory_inputs) == len(current_trajectory_inputs):
207 continue
208 # Build the new avaiable inputs dictionary
209 new_available_inputs = {
210 'input_structure_filename': next_structure_inputs,
211 'input_trajectory_filenames': next_trajectory_inputs
212 }
213 # In case we have new available input formats we find a a new function to get the final desired output format
214 for results in get_combinations(new_functions, new_function_common_inputs, new_available_inputs):
215 if results:
216 yield results
218 # Get every possible combination after combining the corresponding functions
219 first_available_inputs = request_format_set.get('inputs', None)
220 for functions, function_common_inputs, last_common_outputs in get_combinations([], [], first_available_inputs):
221 # Combine all functions into one single function
222 def combined_function (
223 input_structure_filename : Optional[str] = None,
224 input_trajectory_filenames : Optional[List[str]] = None,
225 output_structure_filename : Optional[str] = None,
226 output_trajectory_filename : Optional[str] = None
227 ):
228 auxiliar_filenames = []
229 available_structure_filenames = [ input_structure_filename ]
230 available_trajectory_filenames = [ input_trajectory_filenames ] # This is a list of lists
231 current_input_structure_filename = input_structure_filename
232 current_input_trajectory_filenames = input_trajectory_filenames
233 current_output_structure_filename = None
234 current_output_trajectory_filename = None
235 functions_count = len(functions)
236 for i, function in enumerate(functions):
237 # Get the next function common inputs in order to know what format we must output
238 next_function_index = i + 1
239 already_existing_structure = None
240 already_existing_trajectories = None
241 if next_function_index < functions_count:
242 next_function_common_inputs = function_common_inputs[next_function_index]
243 # Find the formats for the outputs. Use the first common format to do so
244 # First select the structure format
245 next_function_common_structure_formats = next_function_common_inputs.get('input_structure_filename', None)
246 if next_function_common_structure_formats:
247 output_structure_format = list(next_function_common_structure_formats)[0]
248 # Set the output structure filename
249 # Set the ouput as None if there is a structure with the desired format already
250 # Otherwise, create it using an auxiliar filename
251 already_existing_structure = next(
252 ( structure for structure in available_structure_filenames if get_format(structure) == output_structure_format ),
253 None
254 )
255 if already_existing_structure:
256 current_output_structure_filename = None
257 else:
258 auxiliar_structure_filename = '.structure.' + output_structure_format
259 current_output_structure_filename = auxiliar_structure_filename
260 auxiliar_filenames.append(auxiliar_structure_filename)
261 else:
262 current_output_structure_filename = None
263 # Then select the trajectory format
264 next_function_common_trajectory_formats = next_function_common_inputs.get('input_trajectory_filenames', None)
265 if next_function_common_trajectory_formats:
266 output_trajectory_format = list(next_function_common_trajectory_formats)[0]
267 # Set the output trajectory filenames
268 # Set the ouput as None if there are trajectories with the desired format already
269 # Otherwise, create it using an auxiliar filename
270 already_existing_trajectories = next(
271 ( trajectories for trajectories in available_trajectory_filenames if get_format(trajectories[0]) == output_trajectory_format ),
272 None
273 )
274 if already_existing_trajectories:
275 current_output_trajectory_filename = None
276 else:
277 auxiliar_trajectory_filename = '.trajectory.' + output_trajectory_format
278 current_output_trajectory_filename = auxiliar_trajectory_filename
279 auxiliar_filenames.append(auxiliar_trajectory_filename)
280 else:
281 current_output_trajectory_filename = None
282 # In case this is the last function use the final output filenames
283 else:
284 current_output_structure_filename = output_structure_filename
285 current_output_trajectory_filename = output_trajectory_filename
286 # Set the arguments to be passed to the function
287 # This has to be anticipated since we cannot pass an argument the function does not expect
288 converting_function_arguments = getfullargspec(function)[0]
289 passing_arguments = {}
290 if 'input_structure_filename' in converting_function_arguments:
291 passing_arguments['input_structure_filename'] = current_input_structure_filename
292 if 'input_trajectory_filenames' in converting_function_arguments:
293 passing_arguments['input_trajectory_filenames'] = current_input_trajectory_filenames
294 if 'output_structure_filename' in converting_function_arguments:
295 passing_arguments['output_structure_filename'] = current_output_structure_filename
296 if 'output_trajectory_filename' in converting_function_arguments:
297 passing_arguments['output_trajectory_filename'] = current_output_trajectory_filename
298 # Excute the current function
299 function(**passing_arguments)
300 # Now set the inputs for the next function
301 # Also update the available structure/trajectory files in case a further function wants to reuse them
302 if already_existing_structure:
303 current_input_structure_filename = already_existing_structure
304 else:
305 current_input_structure_filename = current_output_structure_filename
306 available_structure_filenames.append(current_output_structure_filename)
307 if already_existing_trajectories:
308 current_input_trajectory_filenames = already_existing_trajectories
309 else:
310 current_input_trajectory_filenames = [ current_output_trajectory_filename ]
311 available_trajectory_filenames.append([ current_output_trajectory_filename ])
312 # Remove auxililar files
313 for auxiliar_filename in auxiliar_filenames:
314 os.remove(auxiliar_filename)
316 # Set the combined function format set
317 combined_format_set = {
318 'inputs': first_available_inputs,
319 'outputs': last_common_outputs
320 }
321 combined_function.format_sets = [combined_format_set]
323 yield combined_function, combined_format_set
326# Structure file formats
327def is_pdb (filename : str) -> bool:
328 return filename[-4:] == '.pdb'
330def is_psf (filename : str) -> bool:
331 return filename[-4:] == '.psf'
333def is_tpr (filename : str) -> bool:
334 return filename[-4:] == '.tpr'
336def is_gro (filename : str) -> bool:
337 return filename[-4:] == '.gro'
339def is_prmtop (filename : str) -> bool:
340 return filename[-7:] == '.prmtop'
342def is_top (filename : str) -> bool:
343 return filename[-4:] == '.top'
345# Trajectory file formats
347def is_xtc (filename : str) -> bool:
348 return filename[-4:] == '.xtc'
350def is_dcd (filename : str) -> bool:
351 return filename[-4:] == '.dcd'
353def is_netcdf (filename : str) -> bool:
354 return filename[-3:] == '.nc'
356def are_xtc (filenames : list) -> bool:
357 return all([ is_xtc(filename) for filename in filenames ])
359def are_dcd (filenames : list) -> bool:
360 return all([ is_dcd(filename) for filename in filenames ])
362def are_netcdf (filenames : list) -> bool:
363 return all([ is_netcdf(filename) for filename in filenames ])
365# Extra formats logic
367# Check if a file may be read by pytraj according to its format
368def is_pytraj_supported (filename : str) -> bool:
369 return is_prmtop(filename) or is_top(filename) or is_psf(filename)
371# From GitHub:
372# ParmFormatDict = {
373# "AMBERPARM": AMBERPARM,
374# "PDBFILE": PDBFILEPARM,
375# "MOL2FILE": MOL2FILEPARM,
376# "CHARMMPSF": CHARMMPSF,
377# "CIFFILE": CIFFILE,
378# "GMXTOP": GMXTOP,
379# "SDFFILE": SDFFILE,
380# "TINKER": TINKERPARM,
381# "UNKNOWN_PARM": UNKNOWN_PARM,
382# }
384def get_pytraj_parm_format (filename : str) -> str:
385 """Get the pytraj format key for the write_parm function for a specific file according to its format."""
386 if is_prmtop(filename):
387 return 'AMBERPARM'
388 if is_psf(filename):
389 return 'CHARMMPSF'
390 if is_top(filename):
391 return 'GMXTOP'
392 if is_pdb(filename):
393 return 'PDBFILE'
394 raise ValueError('The file ' + filename + ' format is not supported')