Coverage for model_workflow/tools/conversions.py: 76%

96 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 10:54 +0000

1from shutil import copyfile 

2from typing import List, Optional 

3from inspect import getfullargspec 

4 

5from model_workflow.utils.formats import get_format_set_suitable_function, get_format_set_suitable_combination 

6from model_workflow.utils.file import File 

7from model_workflow.utils.vmd_spells import vmd_to_pdb 

8from model_workflow.utils.gmx_spells import get_structure, get_structure_alone 

9from model_workflow.utils.gmx_spells import merge_and_convert_trajectories as gmx_merge_and_convert_trajectories 

10from model_workflow.utils.mdt_spells import merge_and_convert_trajectories as mdt_merge_and_convert_trajectories 

11from model_workflow.utils.mdt_spells import merge_and_convert_trajectories_alternative as mdt_merge_and_convert_trajectories_alternative 

12from model_workflow.utils.mdt_spells import merge_and_convert_trajectories_unefficient as mdt_merge_and_convert_trajectories_unefficient 

13from model_workflow.utils.vmd_spells import merge_and_convert_trajectories as vmd_merge_and_convert_trajectories 

14from model_workflow.utils.auxiliar import InputError, warn 

15 

16# Set functions to performe structure conversions 

17# These functions must have 'input_structure_filename' and 'output_structure_filename' keywords 

18# These functions must have the 'format_sets' property 

19# These functions may have the 'input_trajectory_filename' keyword 

20structure_converting_functions = [ get_structure, get_structure_alone, vmd_to_pdb ] 

21 

22# Set functions to performe trajectory conversions 

23# These functions must have 'input_trajectory_filename' and 'output_trajectory_filepath' keywords 

24# These functions must have the 'format_sets' property 

25trajectory_converting_functions = [ 

26 mdt_merge_and_convert_trajectories, 

27 gmx_merge_and_convert_trajectories, 

28 mdt_merge_and_convert_trajectories_alternative, # This should only be used in mdcrd to xtc/trr 

29 vmd_merge_and_convert_trajectories, 

30 mdt_merge_and_convert_trajectories_unefficient 

31] 

32 

33def convert ( 

34 input_structure_filepath : Optional[str] = '', 

35 output_structure_filepath : Optional[str] = '', 

36 input_trajectory_filepaths : Optional[List[str]] = [], 

37 output_trajectory_filepath : Optional[str] = '' 

38): 

39 """ 

40 Handle conversions of different structure and trajectory formats. 

41 Merge multiple input trajectories into one single output trajectory. 

42 Inputs are the original strucutre and/or trajectory files and the list of possible output filenames. 

43 Only one of each group of output filenames will be generated (if possible). 

44 Return the names of the generated output files. 

45 If we have output but not input we must complain. 

46 """ 

47 if output_structure_filepath and not input_structure_filepath: 

48 raise InputError('Missing input structure') 

49 if output_trajectory_filepath and not input_trajectory_filepaths or len(input_trajectory_filepaths) == 0: 

50 raise InputError('Missing input trajectory') 

51 

52 # If the input trajectory filename is not a list but a single string (which should not happen) then fix it 

53 if type(input_trajectory_filepaths) == str: 

54 input_trajectory_filepaths = [input_trajectory_filepaths] 

55 

56 # Parse input filepaths to actual files 

57 # Note that this step automatically raise input errors if any extension is not recognized 

58 input_structure_file = File(input_structure_filepath) 

59 output_structure_file = File(output_structure_filepath) 

60 input_trajectory_files = [ File(path) for path in input_trajectory_filepaths ] 

61 output_trajectory_file = File(output_trajectory_filepath) 

62 

63 # Check input files to exist 

64 input_files = [ input_structure_file ] + input_trajectory_files 

65 for input_file in input_files: 

66 if input_file and not input_file.exists: 

67 raise InputError('Missing input file ' + input_file.path) 

68 

69 # Check all input trajectory formats are the same 

70 input_trajectory_formats = set([ trajectory_file.format for trajectory_file in input_trajectory_files ]) 

71 if len(input_trajectory_formats) > 1: 

72 raise InputError('Input trajectories must have the same format') 

73 

74 # Get the first trajectory as a sample for those processes which do not require the whole trajectory 

75 trajectory_sample = input_trajectory_files[0] if len(input_trajectory_files) > 0 else File(None) 

76 

77 # Check if any input file has an non-standard extension of a supported format 

78 # If so then we create a symlink with the standard extension 

79 # Save created symlinks to remove them at then of the process 

80 symlink_files = [] 

81 if input_structure_file and input_structure_file.extension != input_structure_file.format: 

82 input_structure_file = input_structure_file.get_standard_file() 

83 symlink_files.append(input_structure_file) 

84 if trajectory_sample and trajectory_sample.extension != trajectory_sample.format: 

85 input_trajectory_files = [ trajectory_file.get_standard_file() for trajectory_file in input_trajectory_files ] 

86 symlink_files += input_trajectory_files 

87 trajectory_sample = input_trajectory_files[0] 

88 

89 # Get file formats 

90 input_structure_format = input_structure_file.format 

91 output_structure_format = output_structure_file.format 

92 input_trajectory_format = trajectory_sample.format 

93 output_trajectory_format = output_trajectory_file.format 

94 

95 # Convert the structure 

96 # Do it inside a function just to return as soon as we are done 

97 def convert_structure (): 

98 # If there is no output filename it means we have nothing to do here 

99 if not output_structure_file: 

100 return 

101 # If the input and output names match then we are done 

102 if input_structure_file.path == output_structure_file.path: 

103 return 

104 # If input and output formats are the same then just copy the file with the new name 

105 if input_structure_format == output_structure_format: 

106 copyfile(input_structure_file.path, output_structure_file.path) 

107 return 

108 print(f'Getting structure in {output_structure_format} format from {input_structure_format} file') 

109 # Otherwise, we must convert 

110 # Choose the right conversion function according to input and output formats 

111 request_format_set = { 

112 'inputs': { 

113 'input_structure_filename': { input_structure_format }, 

114 'input_trajectory_filename': { input_trajectory_format } 

115 }, 

116 'outputs': { 

117 'output_structure_filename': { output_structure_format } 

118 } 

119 } 

120 suitable = next(get_format_set_suitable_function( 

121 available_functions=structure_converting_functions, 

122 available_request_format_sets=[request_format_set], 

123 ), None) 

124 # If there is no function to handle this specific conversion we stop here 

125 if not suitable: 

126 raise InputError(f'Conversion from {input_structure_format} to {output_structure_format} is not supported') 

127 converting_function, formats = suitable 

128 # Find the function keywords 

129 # This is important since some functions may need a trajectory input in addition 

130 converting_function_keywords = getfullargspec(converting_function)[0] 

131 required_trajectory = 'input_trajectory_filename' in converting_function_keywords 

132 if required_trajectory: 

133 if len(input_trajectory_files) == 0: 

134 raise InputError(f'The structure input format {input_structure_format} is missing coordinates and the output format {output_structure_format} needs them. An input trajectory file is required.') 

135 converting_function( 

136 input_structure_filename=input_structure_file.path, 

137 input_trajectory_filename=trajectory_sample.path, 

138 output_structure_filename=output_structure_file.path 

139 ) 

140 else: 

141 converting_function( 

142 input_structure_filename=input_structure_file.path, 

143 output_structure_filename=output_structure_file.path 

144 ) 

145 convert_structure() 

146 

147 def convert_trajectory (): 

148 # If there is no output filename it means we have nothing to do here 

149 if not output_trajectory_file: 

150 return 

151 # If the input and output names match then we are done 

152 trajectory_files_count = len(input_trajectory_files) 

153 if trajectory_files_count == 1 and trajectory_sample == output_trajectory_file: 

154 return 

155 # If there is only 1 input trajectory and it has the same format that the output then just copy the file with the new name 

156 if trajectory_files_count == 1 and input_trajectory_format == output_trajectory_format: 

157 copyfile(trajectory_sample.path, output_trajectory_file.path) 

158 return 

159 print(f'Converting trajectory format from {input_trajectory_format} to {output_trajectory_format}') 

160 # Otherwise, we must convert 

161 # Choose the right conversion function according to input and output formats 

162 request_format_set = { 

163 'inputs': { 

164 'input_structure_filename': { input_structure_format }, 

165 'input_trajectory_filenames': { input_trajectory_format } 

166 }, 

167 'outputs': { 

168 'output_trajectory_filename': { output_trajectory_format } 

169 } 

170 } 

171 suitable = next(get_format_set_suitable_function( 

172 available_functions=trajectory_converting_functions, 

173 available_request_format_sets=[request_format_set], 

174 ), None) 

175 # If there is no function to handle this specific conversion we try to combine several functions in order to do it 

176 if not suitable: 

177 warn('There is no function to do the conversion directly. Trying to combine multiple functions...') 

178 suitable = next(get_format_set_suitable_combination( 

179 available_functions=trajectory_converting_functions, 

180 available_request_format_sets=[request_format_set], 

181 ), None) 

182 # If there is no function to handle this specific conversion we stop here 

183 if not suitable: 

184 raise InputError(f'Conversion from {input_trajectory_format} to {output_trajectory_format} is not supported') 

185 converting_function, formats = suitable 

186 # Get the input structure expected format 

187 expected_input_structure_formats = formats['inputs'].get('input_structure_filename', False) 

188 # Get the absolute paths of input trajectory files 

189 trajectory_filepaths = [ trajectory_file.path for trajectory_file in input_trajectory_files ] 

190 # If the function expects any fromat then pass the structure 

191 if expected_input_structure_formats: 

192 converting_function( 

193 input_structure_filename=input_structure_file.path, 

194 input_trajectory_filenames=trajectory_filepaths, 

195 output_trajectory_filename=output_trajectory_file.path 

196 ) 

197 # If the function expects None then pass None 

198 elif expected_input_structure_formats == None: 

199 converting_function( 

200 input_structure_filename=None, 

201 input_trajectory_filenames=trajectory_filepaths, 

202 output_trajectory_filename=output_trajectory_file.path 

203 ) 

204 # If the function has not the input structure argument then do not pass it 

205 else: 

206 converting_function( 

207 input_trajectory_filenames=trajectory_filepaths, 

208 output_trajectory_filename=output_trajectory_file.path 

209 ) 

210 convert_trajectory() 

211 

212 # Remove generated symlinks, if any 

213 for symlink_file in symlink_files: 

214 symlink_file.remove()