Coverage for mddb_workflow/tools/conversions.py: 79%
91 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 15:48 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 15:48 +0000
1from typing import Optional
2from inspect import getfullargspec
4from mddb_workflow.utils.formats import get_format_set_suitable_function, get_format_set_suitable_combination
5from mddb_workflow.utils.file import File
6from mddb_workflow.utils.vmd_spells import vmd_to_pdb
7from mddb_workflow.utils.gmx_spells import get_structure, get_structure_alone
8from mddb_workflow.utils.gmx_spells import merge_and_convert_trajectories as gmx_merge_and_convert_trajectories
9from mddb_workflow.utils.mdt_spells import merge_and_convert_trajectories as mdt_merge_and_convert_trajectories
10from mddb_workflow.utils.mdt_spells import merge_and_convert_trajectories_alternative as mdt_merge_and_convert_trajectories_alternative
11from mddb_workflow.utils.mdt_spells import merge_and_convert_trajectories_unefficient as mdt_merge_and_convert_trajectories_unefficient
12from mddb_workflow.utils.vmd_spells import merge_and_convert_trajectories as vmd_merge_and_convert_trajectories
13from mddb_workflow.utils.auxiliar import InputError, warn
15# Set functions to performe structure conversions
16# These functions must have 'input_structure_filename' and 'output_structure_filename' keywords
17# These functions must have the 'format_sets' property
18# These functions may have the 'input_trajectory_filename' keyword
19structure_converting_functions = [ get_structure, get_structure_alone, vmd_to_pdb ]
21# Set functions to performe trajectory conversions
22# These functions must have 'input_trajectory_filename' and 'output_trajectory_filepath' keywords
23# These functions must have the 'format_sets' property
24trajectory_converting_functions = [
25 mdt_merge_and_convert_trajectories,
26 gmx_merge_and_convert_trajectories,
27 mdt_merge_and_convert_trajectories_alternative, # This should only be used in mdcrd to xtc/trr
28 vmd_merge_and_convert_trajectories,
29 mdt_merge_and_convert_trajectories_unefficient
30]
32def convert (
33 input_structure_filepath : Optional[str] = '',
34 output_structure_filepath : Optional[str] = '',
35 input_trajectory_filepaths : Optional[list[str]] = [],
36 output_trajectory_filepath : Optional[str] = ''
37):
38 """
39 Handle conversions of different structure and trajectory formats.
40 Merge multiple input trajectories into one single output trajectory.
41 Inputs are the original strucutre and/or trajectory files and the list of possible output filenames.
42 Only one of each group of output filenames will be generated (if possible).
43 Return the names of the generated output files.
44 If we have output but not input we must complain.
45 """
46 if output_structure_filepath and not input_structure_filepath:
47 raise InputError('Missing input structure')
48 if output_trajectory_filepath and not input_trajectory_filepaths or len(input_trajectory_filepaths) == 0:
49 raise InputError('Missing input trajectory')
51 # If the input trajectory filename is not a list but a single string (which should not happen) then fix it
52 if type(input_trajectory_filepaths) == str:
53 input_trajectory_filepaths = [input_trajectory_filepaths]
55 # Parse input filepaths to actual files
56 # Note that this step automatically raise input errors if any extension is not recognized
57 input_structure_file = File(input_structure_filepath)
58 output_structure_file = File(output_structure_filepath)
59 input_trajectory_files = [ File(path) for path in input_trajectory_filepaths ]
60 output_trajectory_file = File(output_trajectory_filepath)
62 # Check input files to exist
63 input_files = [ input_structure_file ] + input_trajectory_files
64 for input_file in input_files:
65 if input_file and not input_file.exists:
66 raise InputError('Missing input file ' + input_file.path)
68 # Check all input trajectory formats are the same
69 input_trajectory_formats = set([ trajectory_file.format for trajectory_file in input_trajectory_files ])
70 if len(input_trajectory_formats) > 1:
71 raise InputError('Input trajectories must have the same format')
73 # Get the first trajectory as a sample for those processes which do not require the whole trajectory
74 trajectory_sample = input_trajectory_files[0] if len(input_trajectory_files) > 0 else File(None)
76 # Check if any input file has an non-standard extension of a supported format
77 # If so then we create a symlink with the standard extension
78 # Save created symlinks to remove them at then of the process
79 symlink_files = []
80 if input_structure_file and input_structure_file.extension != input_structure_file.format:
81 input_structure_file = input_structure_file.get_standard_file()
82 symlink_files.append(input_structure_file)
83 if trajectory_sample and trajectory_sample.extension != trajectory_sample.format:
84 input_trajectory_files = [ trajectory_file.get_standard_file() for trajectory_file in input_trajectory_files ]
85 symlink_files += input_trajectory_files
86 trajectory_sample = input_trajectory_files[0]
88 # Get file formats
89 input_structure_format = input_structure_file.format
90 output_structure_format = output_structure_file.format
91 input_trajectory_format = trajectory_sample.format
92 output_trajectory_format = output_trajectory_file.format
94 # Convert the structure
95 # Do it inside a function just to return as soon as we are done
96 def convert_structure ():
97 # If there is no output filename it means we have nothing to do here
98 if not output_structure_file: return
99 # If the input and output names match then we are done
100 if input_structure_file.path == output_structure_file.path: return
101 # If input and output formats are the same then just copy the file with the new name
102 if input_structure_format == output_structure_format:
103 output_structure_file.set_symlink_to(input_structure_file)
104 return
105 print(f'Getting structure in {output_structure_format} format from {input_structure_format} file')
106 # Otherwise, we must convert
107 # Choose the right conversion function according to input and output formats
108 request_format_set = {
109 'inputs': {
110 'input_structure_filename': { input_structure_format },
111 'input_trajectory_filename': { input_trajectory_format }
112 },
113 'outputs': {
114 'output_structure_filename': { output_structure_format }
115 }
116 }
117 suitable = next(get_format_set_suitable_function(
118 available_functions=structure_converting_functions,
119 available_request_format_sets=[request_format_set],
120 ), None)
121 # If there is no function to handle this specific conversion we stop here
122 if not suitable:
123 raise InputError(f'Conversion from {input_structure_format} to {output_structure_format} is not supported')
124 converting_function, formats = suitable
125 # Find the function keywords
126 # This is important since some functions may need a trajectory input in addition
127 converting_function_keywords = getfullargspec(converting_function)[0]
128 required_trajectory = 'input_trajectory_filename' in converting_function_keywords
129 if required_trajectory:
130 if len(input_trajectory_files) == 0:
131 raise InputError(f'The structure input format {input_structure_format} is missing coordinates and the output format {output_structure_format} needs them. An input trajectory file is required.')
132 converting_function(
133 input_structure_filename=input_structure_file.path,
134 input_trajectory_filename=trajectory_sample.path,
135 output_structure_filename=output_structure_file.path
136 )
137 else:
138 converting_function(
139 input_structure_filename=input_structure_file.path,
140 output_structure_filename=output_structure_file.path
141 )
142 convert_structure()
144 def convert_trajectory ():
145 # If there is no output filename it means we have nothing to do here
146 if not output_trajectory_file: return
147 # If the input and output names match then we are done
148 trajectory_files_count = len(input_trajectory_files)
149 if trajectory_files_count == 1 and trajectory_sample == output_trajectory_file: return
150 # If there is only 1 input trajectory and it has the same format that the output then just copy the file with the new name
151 if trajectory_files_count == 1 and input_trajectory_format == output_trajectory_format:
152 output_trajectory_file.set_symlink_to(trajectory_sample)
153 return
154 print(f'Converting trajectory format from {input_trajectory_format} to {output_trajectory_format}')
155 # Otherwise, we must convert
156 # Choose the right conversion function according to input and output formats
157 request_format_set = {
158 'inputs': {
159 'input_structure_filename': { input_structure_format },
160 'input_trajectory_filenames': { input_trajectory_format }
161 },
162 'outputs': {
163 'output_trajectory_filename': { output_trajectory_format }
164 }
165 }
166 suitable = next(get_format_set_suitable_function(
167 available_functions=trajectory_converting_functions,
168 available_request_format_sets=[request_format_set],
169 ), None)
170 # If there is no function to handle this specific conversion we try to combine several functions in order to do it
171 if not suitable:
172 warn('There is no function to do the conversion directly. Trying to combine multiple functions...')
173 suitable = next(get_format_set_suitable_combination(
174 available_functions=trajectory_converting_functions,
175 available_request_format_sets=[request_format_set],
176 ), None)
177 # If there is no function to handle this specific conversion we stop here
178 if not suitable:
179 raise InputError(f'Conversion from {input_trajectory_format} to {output_trajectory_format} is not supported')
180 converting_function, formats = suitable
181 # Get the input structure expected format
182 expected_input_structure_formats = formats['inputs'].get('input_structure_filename', False)
183 # Get the absolute paths of input trajectory files
184 trajectory_filepaths = [ trajectory_file.path for trajectory_file in input_trajectory_files ]
185 # If the function expects any fromat then pass the structure
186 if expected_input_structure_formats:
187 converting_function(
188 input_structure_filename=input_structure_file.path,
189 input_trajectory_filenames=trajectory_filepaths,
190 output_trajectory_filename=output_trajectory_file.path
191 )
192 # If the function expects None then pass None
193 elif expected_input_structure_formats == None:
194 converting_function(
195 input_structure_filename=None,
196 input_trajectory_filenames=trajectory_filepaths,
197 output_trajectory_filename=output_trajectory_file.path
198 )
199 # If the function has not the input structure argument then do not pass it
200 else:
201 converting_function(
202 input_trajectory_filenames=trajectory_filepaths,
203 output_trajectory_filename=output_trajectory_file.path
204 )
205 convert_trajectory()
207 # Remove generated symlinks, if any
208 for symlink_file in symlink_files:
209 symlink_file.remove()