Coverage for mddb_workflow/tools/conversions.py: 79%

91 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-29 15:48 +0000

1from typing import Optional 

2from inspect import getfullargspec 

3 

4from mddb_workflow.utils.formats import get_format_set_suitable_function, get_format_set_suitable_combination 

5from mddb_workflow.utils.file import File 

6from mddb_workflow.utils.vmd_spells import vmd_to_pdb 

7from mddb_workflow.utils.gmx_spells import get_structure, get_structure_alone 

8from mddb_workflow.utils.gmx_spells import merge_and_convert_trajectories as gmx_merge_and_convert_trajectories 

9from mddb_workflow.utils.mdt_spells import merge_and_convert_trajectories as mdt_merge_and_convert_trajectories 

10from mddb_workflow.utils.mdt_spells import merge_and_convert_trajectories_alternative as mdt_merge_and_convert_trajectories_alternative 

11from mddb_workflow.utils.mdt_spells import merge_and_convert_trajectories_unefficient as mdt_merge_and_convert_trajectories_unefficient 

12from mddb_workflow.utils.vmd_spells import merge_and_convert_trajectories as vmd_merge_and_convert_trajectories 

13from mddb_workflow.utils.auxiliar import InputError, warn 

14 

15# Set functions to performe structure conversions 

16# These functions must have 'input_structure_filename' and 'output_structure_filename' keywords 

17# These functions must have the 'format_sets' property 

18# These functions may have the 'input_trajectory_filename' keyword 

19structure_converting_functions = [ get_structure, get_structure_alone, vmd_to_pdb ] 

20 

21# Set functions to performe trajectory conversions 

22# These functions must have 'input_trajectory_filename' and 'output_trajectory_filepath' keywords 

23# These functions must have the 'format_sets' property 

24trajectory_converting_functions = [ 

25 mdt_merge_and_convert_trajectories, 

26 gmx_merge_and_convert_trajectories, 

27 mdt_merge_and_convert_trajectories_alternative, # This should only be used in mdcrd to xtc/trr 

28 vmd_merge_and_convert_trajectories, 

29 mdt_merge_and_convert_trajectories_unefficient 

30] 

31 

32def convert ( 

33 input_structure_filepath : Optional[str] = '', 

34 output_structure_filepath : Optional[str] = '', 

35 input_trajectory_filepaths : Optional[list[str]] = [], 

36 output_trajectory_filepath : Optional[str] = '' 

37): 

38 """ 

39 Handle conversions of different structure and trajectory formats. 

40 Merge multiple input trajectories into one single output trajectory. 

41 Inputs are the original strucutre and/or trajectory files and the list of possible output filenames. 

42 Only one of each group of output filenames will be generated (if possible). 

43 Return the names of the generated output files. 

44 If we have output but not input we must complain. 

45 """ 

46 if output_structure_filepath and not input_structure_filepath: 

47 raise InputError('Missing input structure') 

48 if output_trajectory_filepath and not input_trajectory_filepaths or len(input_trajectory_filepaths) == 0: 

49 raise InputError('Missing input trajectory') 

50 

51 # If the input trajectory filename is not a list but a single string (which should not happen) then fix it 

52 if type(input_trajectory_filepaths) == str: 

53 input_trajectory_filepaths = [input_trajectory_filepaths] 

54 

55 # Parse input filepaths to actual files 

56 # Note that this step automatically raise input errors if any extension is not recognized 

57 input_structure_file = File(input_structure_filepath) 

58 output_structure_file = File(output_structure_filepath) 

59 input_trajectory_files = [ File(path) for path in input_trajectory_filepaths ] 

60 output_trajectory_file = File(output_trajectory_filepath) 

61 

62 # Check input files to exist 

63 input_files = [ input_structure_file ] + input_trajectory_files 

64 for input_file in input_files: 

65 if input_file and not input_file.exists: 

66 raise InputError('Missing input file ' + input_file.path) 

67 

68 # Check all input trajectory formats are the same 

69 input_trajectory_formats = set([ trajectory_file.format for trajectory_file in input_trajectory_files ]) 

70 if len(input_trajectory_formats) > 1: 

71 raise InputError('Input trajectories must have the same format') 

72 

73 # Get the first trajectory as a sample for those processes which do not require the whole trajectory 

74 trajectory_sample = input_trajectory_files[0] if len(input_trajectory_files) > 0 else File(None) 

75 

76 # Check if any input file has an non-standard extension of a supported format 

77 # If so then we create a symlink with the standard extension 

78 # Save created symlinks to remove them at then of the process 

79 symlink_files = [] 

80 if input_structure_file and input_structure_file.extension != input_structure_file.format: 

81 input_structure_file = input_structure_file.get_standard_file() 

82 symlink_files.append(input_structure_file) 

83 if trajectory_sample and trajectory_sample.extension != trajectory_sample.format: 

84 input_trajectory_files = [ trajectory_file.get_standard_file() for trajectory_file in input_trajectory_files ] 

85 symlink_files += input_trajectory_files 

86 trajectory_sample = input_trajectory_files[0] 

87 

88 # Get file formats 

89 input_structure_format = input_structure_file.format 

90 output_structure_format = output_structure_file.format 

91 input_trajectory_format = trajectory_sample.format 

92 output_trajectory_format = output_trajectory_file.format 

93 

94 # Convert the structure 

95 # Do it inside a function just to return as soon as we are done 

96 def convert_structure (): 

97 # If there is no output filename it means we have nothing to do here 

98 if not output_structure_file: return 

99 # If the input and output names match then we are done 

100 if input_structure_file.path == output_structure_file.path: return 

101 # If input and output formats are the same then just copy the file with the new name 

102 if input_structure_format == output_structure_format: 

103 output_structure_file.set_symlink_to(input_structure_file) 

104 return 

105 print(f'Getting structure in {output_structure_format} format from {input_structure_format} file') 

106 # Otherwise, we must convert 

107 # Choose the right conversion function according to input and output formats 

108 request_format_set = { 

109 'inputs': { 

110 'input_structure_filename': { input_structure_format }, 

111 'input_trajectory_filename': { input_trajectory_format } 

112 }, 

113 'outputs': { 

114 'output_structure_filename': { output_structure_format } 

115 } 

116 } 

117 suitable = next(get_format_set_suitable_function( 

118 available_functions=structure_converting_functions, 

119 available_request_format_sets=[request_format_set], 

120 ), None) 

121 # If there is no function to handle this specific conversion we stop here 

122 if not suitable: 

123 raise InputError(f'Conversion from {input_structure_format} to {output_structure_format} is not supported') 

124 converting_function, formats = suitable 

125 # Find the function keywords 

126 # This is important since some functions may need a trajectory input in addition 

127 converting_function_keywords = getfullargspec(converting_function)[0] 

128 required_trajectory = 'input_trajectory_filename' in converting_function_keywords 

129 if required_trajectory: 

130 if len(input_trajectory_files) == 0: 

131 raise InputError(f'The structure input format {input_structure_format} is missing coordinates and the output format {output_structure_format} needs them. An input trajectory file is required.') 

132 converting_function( 

133 input_structure_filename=input_structure_file.path, 

134 input_trajectory_filename=trajectory_sample.path, 

135 output_structure_filename=output_structure_file.path 

136 ) 

137 else: 

138 converting_function( 

139 input_structure_filename=input_structure_file.path, 

140 output_structure_filename=output_structure_file.path 

141 ) 

142 convert_structure() 

143 

144 def convert_trajectory (): 

145 # If there is no output filename it means we have nothing to do here 

146 if not output_trajectory_file: return 

147 # If the input and output names match then we are done 

148 trajectory_files_count = len(input_trajectory_files) 

149 if trajectory_files_count == 1 and trajectory_sample == output_trajectory_file: return 

150 # If there is only 1 input trajectory and it has the same format that the output then just copy the file with the new name 

151 if trajectory_files_count == 1 and input_trajectory_format == output_trajectory_format: 

152 output_trajectory_file.set_symlink_to(trajectory_sample) 

153 return 

154 print(f'Converting trajectory format from {input_trajectory_format} to {output_trajectory_format}') 

155 # Otherwise, we must convert 

156 # Choose the right conversion function according to input and output formats 

157 request_format_set = { 

158 'inputs': { 

159 'input_structure_filename': { input_structure_format }, 

160 'input_trajectory_filenames': { input_trajectory_format } 

161 }, 

162 'outputs': { 

163 'output_trajectory_filename': { output_trajectory_format } 

164 } 

165 } 

166 suitable = next(get_format_set_suitable_function( 

167 available_functions=trajectory_converting_functions, 

168 available_request_format_sets=[request_format_set], 

169 ), None) 

170 # If there is no function to handle this specific conversion we try to combine several functions in order to do it 

171 if not suitable: 

172 warn('There is no function to do the conversion directly. Trying to combine multiple functions...') 

173 suitable = next(get_format_set_suitable_combination( 

174 available_functions=trajectory_converting_functions, 

175 available_request_format_sets=[request_format_set], 

176 ), None) 

177 # If there is no function to handle this specific conversion we stop here 

178 if not suitable: 

179 raise InputError(f'Conversion from {input_trajectory_format} to {output_trajectory_format} is not supported') 

180 converting_function, formats = suitable 

181 # Get the input structure expected format 

182 expected_input_structure_formats = formats['inputs'].get('input_structure_filename', False) 

183 # Get the absolute paths of input trajectory files 

184 trajectory_filepaths = [ trajectory_file.path for trajectory_file in input_trajectory_files ] 

185 # If the function expects any fromat then pass the structure 

186 if expected_input_structure_formats: 

187 converting_function( 

188 input_structure_filename=input_structure_file.path, 

189 input_trajectory_filenames=trajectory_filepaths, 

190 output_trajectory_filename=output_trajectory_file.path 

191 ) 

192 # If the function expects None then pass None 

193 elif expected_input_structure_formats == None: 

194 converting_function( 

195 input_structure_filename=None, 

196 input_trajectory_filenames=trajectory_filepaths, 

197 output_trajectory_filename=output_trajectory_file.path 

198 ) 

199 # If the function has not the input structure argument then do not pass it 

200 else: 

201 converting_function( 

202 input_trajectory_filenames=trajectory_filepaths, 

203 output_trajectory_filename=output_trajectory_file.path 

204 ) 

205 convert_trajectory() 

206 

207 # Remove generated symlinks, if any 

208 for symlink_file in symlink_files: 

209 symlink_file.remove()