Coverage for model_workflow/tools/conversions.py: 76%
96 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
1from shutil import copyfile
2from typing import List, Optional
3from inspect import getfullargspec
5from model_workflow.utils.formats import get_format_set_suitable_function, get_format_set_suitable_combination
6from model_workflow.utils.file import File
7from model_workflow.utils.vmd_spells import vmd_to_pdb
8from model_workflow.utils.gmx_spells import get_structure, get_structure_alone
9from model_workflow.utils.gmx_spells import merge_and_convert_trajectories as gmx_merge_and_convert_trajectories
10from model_workflow.utils.mdt_spells import merge_and_convert_trajectories as mdt_merge_and_convert_trajectories
11from model_workflow.utils.mdt_spells import merge_and_convert_trajectories_alternative as mdt_merge_and_convert_trajectories_alternative
12from model_workflow.utils.mdt_spells import merge_and_convert_trajectories_unefficient as mdt_merge_and_convert_trajectories_unefficient
13from model_workflow.utils.vmd_spells import merge_and_convert_trajectories as vmd_merge_and_convert_trajectories
14from model_workflow.utils.auxiliar import InputError, warn
16# Set functions to performe structure conversions
17# These functions must have 'input_structure_filename' and 'output_structure_filename' keywords
18# These functions must have the 'format_sets' property
19# These functions may have the 'input_trajectory_filename' keyword
20structure_converting_functions = [ get_structure, get_structure_alone, vmd_to_pdb ]
22# Set functions to performe trajectory conversions
23# These functions must have 'input_trajectory_filename' and 'output_trajectory_filepath' keywords
24# These functions must have the 'format_sets' property
25trajectory_converting_functions = [
26 mdt_merge_and_convert_trajectories,
27 gmx_merge_and_convert_trajectories,
28 mdt_merge_and_convert_trajectories_alternative, # This should only be used in mdcrd to xtc/trr
29 vmd_merge_and_convert_trajectories,
30 mdt_merge_and_convert_trajectories_unefficient
31]
33def convert (
34 input_structure_filepath : Optional[str] = '',
35 output_structure_filepath : Optional[str] = '',
36 input_trajectory_filepaths : Optional[List[str]] = [],
37 output_trajectory_filepath : Optional[str] = ''
38):
39 """
40 Handle conversions of different structure and trajectory formats.
41 Merge multiple input trajectories into one single output trajectory.
42 Inputs are the original strucutre and/or trajectory files and the list of possible output filenames.
43 Only one of each group of output filenames will be generated (if possible).
44 Return the names of the generated output files.
45 If we have output but not input we must complain.
46 """
47 if output_structure_filepath and not input_structure_filepath:
48 raise InputError('Missing input structure')
49 if output_trajectory_filepath and not input_trajectory_filepaths or len(input_trajectory_filepaths) == 0:
50 raise InputError('Missing input trajectory')
52 # If the input trajectory filename is not a list but a single string (which should not happen) then fix it
53 if type(input_trajectory_filepaths) == str:
54 input_trajectory_filepaths = [input_trajectory_filepaths]
56 # Parse input filepaths to actual files
57 # Note that this step automatically raise input errors if any extension is not recognized
58 input_structure_file = File(input_structure_filepath)
59 output_structure_file = File(output_structure_filepath)
60 input_trajectory_files = [ File(path) for path in input_trajectory_filepaths ]
61 output_trajectory_file = File(output_trajectory_filepath)
63 # Check input files to exist
64 input_files = [ input_structure_file ] + input_trajectory_files
65 for input_file in input_files:
66 if input_file and not input_file.exists:
67 raise InputError('Missing input file ' + input_file.path)
69 # Check all input trajectory formats are the same
70 input_trajectory_formats = set([ trajectory_file.format for trajectory_file in input_trajectory_files ])
71 if len(input_trajectory_formats) > 1:
72 raise InputError('Input trajectories must have the same format')
74 # Get the first trajectory as a sample for those processes which do not require the whole trajectory
75 trajectory_sample = input_trajectory_files[0] if len(input_trajectory_files) > 0 else File(None)
77 # Check if any input file has an non-standard extension of a supported format
78 # If so then we create a symlink with the standard extension
79 # Save created symlinks to remove them at then of the process
80 symlink_files = []
81 if input_structure_file and input_structure_file.extension != input_structure_file.format:
82 input_structure_file = input_structure_file.get_standard_file()
83 symlink_files.append(input_structure_file)
84 if trajectory_sample and trajectory_sample.extension != trajectory_sample.format:
85 input_trajectory_files = [ trajectory_file.get_standard_file() for trajectory_file in input_trajectory_files ]
86 symlink_files += input_trajectory_files
87 trajectory_sample = input_trajectory_files[0]
89 # Get file formats
90 input_structure_format = input_structure_file.format
91 output_structure_format = output_structure_file.format
92 input_trajectory_format = trajectory_sample.format
93 output_trajectory_format = output_trajectory_file.format
95 # Convert the structure
96 # Do it inside a function just to return as soon as we are done
97 def convert_structure ():
98 # If there is no output filename it means we have nothing to do here
99 if not output_structure_file:
100 return
101 # If the input and output names match then we are done
102 if input_structure_file.path == output_structure_file.path:
103 return
104 # If input and output formats are the same then just copy the file with the new name
105 if input_structure_format == output_structure_format:
106 copyfile(input_structure_file.path, output_structure_file.path)
107 return
108 print(f'Getting structure in {output_structure_format} format from {input_structure_format} file')
109 # Otherwise, we must convert
110 # Choose the right conversion function according to input and output formats
111 request_format_set = {
112 'inputs': {
113 'input_structure_filename': { input_structure_format },
114 'input_trajectory_filename': { input_trajectory_format }
115 },
116 'outputs': {
117 'output_structure_filename': { output_structure_format }
118 }
119 }
120 suitable = next(get_format_set_suitable_function(
121 available_functions=structure_converting_functions,
122 available_request_format_sets=[request_format_set],
123 ), None)
124 # If there is no function to handle this specific conversion we stop here
125 if not suitable:
126 raise InputError(f'Conversion from {input_structure_format} to {output_structure_format} is not supported')
127 converting_function, formats = suitable
128 # Find the function keywords
129 # This is important since some functions may need a trajectory input in addition
130 converting_function_keywords = getfullargspec(converting_function)[0]
131 required_trajectory = 'input_trajectory_filename' in converting_function_keywords
132 if required_trajectory:
133 if len(input_trajectory_files) == 0:
134 raise InputError(f'The structure input format {input_structure_format} is missing coordinates and the output format {output_structure_format} needs them. An input trajectory file is required.')
135 converting_function(
136 input_structure_filename=input_structure_file.path,
137 input_trajectory_filename=trajectory_sample.path,
138 output_structure_filename=output_structure_file.path
139 )
140 else:
141 converting_function(
142 input_structure_filename=input_structure_file.path,
143 output_structure_filename=output_structure_file.path
144 )
145 convert_structure()
147 def convert_trajectory ():
148 # If there is no output filename it means we have nothing to do here
149 if not output_trajectory_file:
150 return
151 # If the input and output names match then we are done
152 trajectory_files_count = len(input_trajectory_files)
153 if trajectory_files_count == 1 and trajectory_sample == output_trajectory_file:
154 return
155 # If there is only 1 input trajectory and it has the same format that the output then just copy the file with the new name
156 if trajectory_files_count == 1 and input_trajectory_format == output_trajectory_format:
157 copyfile(trajectory_sample.path, output_trajectory_file.path)
158 return
159 print(f'Converting trajectory format from {input_trajectory_format} to {output_trajectory_format}')
160 # Otherwise, we must convert
161 # Choose the right conversion function according to input and output formats
162 request_format_set = {
163 'inputs': {
164 'input_structure_filename': { input_structure_format },
165 'input_trajectory_filenames': { input_trajectory_format }
166 },
167 'outputs': {
168 'output_trajectory_filename': { output_trajectory_format }
169 }
170 }
171 suitable = next(get_format_set_suitable_function(
172 available_functions=trajectory_converting_functions,
173 available_request_format_sets=[request_format_set],
174 ), None)
175 # If there is no function to handle this specific conversion we try to combine several functions in order to do it
176 if not suitable:
177 warn('There is no function to do the conversion directly. Trying to combine multiple functions...')
178 suitable = next(get_format_set_suitable_combination(
179 available_functions=trajectory_converting_functions,
180 available_request_format_sets=[request_format_set],
181 ), None)
182 # If there is no function to handle this specific conversion we stop here
183 if not suitable:
184 raise InputError(f'Conversion from {input_trajectory_format} to {output_trajectory_format} is not supported')
185 converting_function, formats = suitable
186 # Get the input structure expected format
187 expected_input_structure_formats = formats['inputs'].get('input_structure_filename', False)
188 # Get the absolute paths of input trajectory files
189 trajectory_filepaths = [ trajectory_file.path for trajectory_file in input_trajectory_files ]
190 # If the function expects any fromat then pass the structure
191 if expected_input_structure_formats:
192 converting_function(
193 input_structure_filename=input_structure_file.path,
194 input_trajectory_filenames=trajectory_filepaths,
195 output_trajectory_filename=output_trajectory_file.path
196 )
197 # If the function expects None then pass None
198 elif expected_input_structure_formats == None:
199 converting_function(
200 input_structure_filename=None,
201 input_trajectory_filenames=trajectory_filepaths,
202 output_trajectory_filename=output_trajectory_file.path
203 )
204 # If the function has not the input structure argument then do not pass it
205 else:
206 converting_function(
207 input_trajectory_filenames=trajectory_filepaths,
208 output_trajectory_filename=output_trajectory_file.path
209 )
210 convert_trajectory()
212 # Remove generated symlinks, if any
213 for symlink_file in symlink_files:
214 symlink_file.remove()