Coverage for model_workflow/utils/formats.py: 75%

198 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 10:54 +0000

1import os 

2from typing import Optional, List, Tuple, Callable, Generator 

3from inspect import getfullargspec 

4 

5def get_format (filename : str) -> str: 

6 """Get a filename format.""" 

7 if not filename: 

8 return None 

9 return filename.split('.')[-1] 

10 

11def get_format_set_suitable_function ( 

12 available_functions : List[Callable], 

13 available_request_format_sets : List[dict], 

14) -> Generator[ Optional[ Tuple[ Callable, dict ]], None, None ]: 

15 """ 

16 Find a function which is suitable for any of the available request "format sets". 

17 All functions are checked for each request format set before jumping to another and they are evaluated in order. 

18 A function and new generated format set with formats in common are returned. 

19 None is returned when there is no suitable function. 

20 

21 WARNING: Available functions must have the 'format_sets' property. 

22 Format sets are dictionaries which specify input and output formats. 

23 Consider function format sets as 'required input' and 'available output'. 

24 Consider request format sets as 'available input' and 'required output'. 

25 Both inputs and outputs are dictionaries where keys are function arguments and values are sets of supported formats. 

26 Alternatively, an argument may have None as value to represent unnecessary requirements or missing availabilities. 

27  

28 An example is shown below: 

29 { 

30 'inputs': { 

31 'input_structure_filename': {'tpr'}, 

32 'input_trajectory_filenames': {'xtc', 'trr'}, 

33 }, 

34 'outputs': { 

35 'output_trajectory_filename': {'pdb', 'gro'} 

36 }, 

37 } 

38 

39 Functions may have multiple format sets since different input formats may lead to different output formats. 

40 """ 

41 # ------------------------------------------------------------- 

42 # Use this to see what is going on 

43 # print('REQUEST') 

44 # print(available_request_format_sets) 

45 # print('AVAILABLE FUNCTIONS') 

46 # print([ func.format_sets for func in available_functions ]) 

47 # ------------------------------------------------------------- 

48 # Try with each request format set 

49 for request_format_set in available_request_format_sets: 

50 # Search functions to match formats for every required argument 

51 for function in available_functions: 

52 # Test every function format set independently 

53 for function_format_set in function.format_sets: 

54 # Check format keys are compatible 

55 if not check_format_sets_compability(request_format_set, function_format_set): 

56 raise SystemExit('Format keys are not compatible with function ' + str(function.__name__)) 

57 # Check the function inputs to be fulfilled by the request inputs 

58 required_inputs = function_format_set.get('inputs', None) 

59 available_inputs = request_format_set.get('inputs', None) 

60 common_inputs = get_common_argument_formats(required_inputs, available_inputs) 

61 # If any of the common format sets was empty it means formats do not match 

62 if not common_inputs: 

63 continue 

64 # Check the request outputs to be fulfilled by the function outputs 

65 required_outputs = request_format_set.get('outputs', None) 

66 available_outputs = function_format_set.get('outputs', None) 

67 common_outputs = get_common_argument_formats(required_outputs, available_outputs) 

68 # If any of the common format sets was empty it means formats do not match 

69 if not common_outputs: 

70 continue 

71 # Generate a new format set with the common formats for every argument 

72 common_format_set = { 'inputs': common_inputs, 'outputs': common_outputs } 

73 # Otherwise we have the function 

74 yield function, common_format_set 

75 

76def get_common_argument_formats (required_arguments : dict, available_arguments : dict): 

77 """ 

78 Get compatible formats between two groups of arguments. 

79 All required argument formats must be fulfilled by the available argument formats. 

80 Arguments are defined as dictionaries with argument names as keys and sets of available formats as values. 

81 e.g. 

82 

83 { 

84 'input_structure_filename': {'tpr'}, 

85 'input_trajectory_filenames': {'xtc', 'trr'}, 

86 } 

87 """ 

88 # If there are not required arguments we return an empty dictionary 

89 if not required_arguments: 

90 return {'not_required'} 

91 # If there are not available arguments we return None 

92 if not available_arguments: 

93 return None 

94 # Set a dictionary with the same keys that the input arguments including only the common formats 

95 common_argument_formats = {} 

96 # Iterate over each required argument 

97 for required_argument, required_formats in required_arguments.items(): 

98 # If there is not format required for this argument we set None for this argument common formats 

99 if required_formats == None: 

100 common_argument_formats[required_argument] = None 

101 continue 

102 # Get the available formats for the required input 

103 available_formats = available_arguments.get(required_argument, None) # DANI cuidao aquí, que igual el get no hacía falta 

104 # If the available arguments are missing this required argument then the function is not compatible 

105 if available_formats == None: 

106 return None 

107 # Find the formats in common between both the required and the available arguments 

108 common_formats = available_formats.intersection(required_formats) 

109 # If the common formats set is empty for this required argument then the function is not compatible 

110 if not common_formats: 

111 return None 

112 common_argument_formats[required_argument] = common_formats 

113 return common_argument_formats 

114 

115 

116def check_format_sets_compability (request_format_set : dict, function_format_set : dict) -> bool: 

117 """ 

118 Check two format sets to be compatible. 

119 Both function and request format sets must match in their requirements. 

120 i.e. all function format set input arguments must be included in request format set input arguments. 

121 i.e. all request format set output arguments must be included in function format set output arguments. 

122 """ 

123 # Check the function inputs keyowrds to exist in the request input arguments 

124 required_inputs = function_format_set.get('inputs', None) 

125 if required_inputs: 

126 required_input_arguments = required_inputs.keys() 

127 available_inputs = request_format_set.get('inputs', None) 

128 if not available_inputs: 

129 print('ERROR: Missing inputs') 

130 return False 

131 available_input_arguments = available_inputs.keys() 

132 for argument in required_input_arguments: 

133 if argument not in available_input_arguments: 

134 print('ERROR: Missing ' + argument + ' argument') 

135 return False 

136 # Check the request output keyowrds to exist in the function output arguments 

137 available_outputs = request_format_set.get('outputs', None) 

138 if available_outputs: 

139 required_output_arguments = available_outputs.keys() 

140 available_outputs = function_format_set.get('outputs', None) 

141 if not available_outputs: 

142 print('ERROR: Missing outputs') 

143 return False 

144 available_output_arguments = function_format_set['outputs'].keys() 

145 for argument in required_output_arguments: 

146 if argument not in available_output_arguments: 

147 print('ERROR: Missing ' + argument + ' argument') 

148 return False 

149 return True 

150 

151# WARNING: This function makes only sets for those functions whose output can be reused as input of others 

152# WARNING: i.e. functions which return structure/trajectory files 

153# WARNING: This function should be called only when "get_format_set_suitable_function" has failed 

154def get_format_set_suitable_combination ( 

155 available_functions : List[Callable], 

156 available_request_format_sets : List[dict], 

157) -> Optional[ Tuple[ Callable, dict ] ]: 

158 

159 # Try with each request format set 

160 for request_format_set in available_request_format_sets: 

161 

162 # Get the required outputs 

163 required_outputs = request_format_set.get('outputs', None) 

164 

165 # For each function + format set possibility which is compatible with the required inputs, return the available outputs 

166 def get_combinations ( 

167 current_functions : List[Callable], 

168 current_function_common_inputs : List[dict], 

169 available_inputs : dict, 

170 ): 

171 # Search functions to match formats for every required argument 

172 for function in available_functions: 

173 # Test every function format set independently 

174 for function_format_set in function.format_sets: 

175 # Check format keys are compatible 

176 if not check_format_sets_compability(request_format_set, function_format_set): 

177 raise ValueError('Format keys are not compatible with function ' + str(function.__name__)) 

178 # We add the current function to the list of functions to combine 

179 new_functions = [ *current_functions, function ] 

180 # Check the function inputs to be fulfilled by the request inputs 

181 required_inputs = function_format_set.get('inputs', None) 

182 common_inputs = get_common_argument_formats(required_inputs, available_inputs) 

183 new_function_common_inputs = [ *current_function_common_inputs, common_inputs ] 

184 # If any of the common format sets was empty it means formats do not match 

185 if not common_inputs: 

186 continue 

187 # Check the request outputs to be fulfilled by the function outputs 

188 available_outputs = function_format_set.get('outputs', None) 

189 common_outputs = get_common_argument_formats(required_outputs, available_outputs) 

190 # If any of the common format sets was not empty then we have found a successful combination 

191 if common_outputs: 

192 # Use this print to display which functions are selected 

193 #print([ cf.__name__ + ' from ' + cf.__module__ for cf in new_functions ]) 

194 yield new_functions, new_function_common_inputs, common_outputs 

195 # If we have no common outputs yet we must make another jump 

196 # Merge current available inputs with the current function available outputs 

197 # The using all these formats as available inputs, try to find another function 

198 current_structure_inputs = available_inputs.get('input_structure_filename', set()) 

199 current_trajectory_inputs = available_inputs.get('input_trajectory_filenames', set()) 

200 new_structure_inputs = available_outputs.get('output_structure_filename', set()) 

201 new_trajectory_inputs = available_outputs.get('output_trajectory_filename', set()) 

202 next_structure_inputs = current_structure_inputs.union(new_structure_inputs) 

203 next_trajectory_inputs = current_trajectory_inputs.union(new_trajectory_inputs) 

204 # If current function did not add new formats to the already available formats then current step was useless 

205 # In this case, stop here 

206 if len(next_structure_inputs) == len(current_structure_inputs) and len(next_trajectory_inputs) == len(current_trajectory_inputs): 

207 continue 

208 # Build the new avaiable inputs dictionary 

209 new_available_inputs = { 

210 'input_structure_filename': next_structure_inputs, 

211 'input_trajectory_filenames': next_trajectory_inputs 

212 } 

213 # In case we have new available input formats we find a a new function to get the final desired output format 

214 for results in get_combinations(new_functions, new_function_common_inputs, new_available_inputs): 

215 if results: 

216 yield results 

217 

218 # Get every possible combination after combining the corresponding functions 

219 first_available_inputs = request_format_set.get('inputs', None) 

220 for functions, function_common_inputs, last_common_outputs in get_combinations([], [], first_available_inputs): 

221 # Combine all functions into one single function 

222 def combined_function ( 

223 input_structure_filename : Optional[str] = None, 

224 input_trajectory_filenames : Optional[List[str]] = None, 

225 output_structure_filename : Optional[str] = None, 

226 output_trajectory_filename : Optional[str] = None 

227 ): 

228 auxiliar_filenames = [] 

229 available_structure_filenames = [ input_structure_filename ] 

230 available_trajectory_filenames = [ input_trajectory_filenames ] # This is a list of lists 

231 current_input_structure_filename = input_structure_filename 

232 current_input_trajectory_filenames = input_trajectory_filenames 

233 current_output_structure_filename = None 

234 current_output_trajectory_filename = None 

235 functions_count = len(functions) 

236 for i, function in enumerate(functions): 

237 # Get the next function common inputs in order to know what format we must output 

238 next_function_index = i + 1 

239 already_existing_structure = None 

240 already_existing_trajectories = None 

241 if next_function_index < functions_count: 

242 next_function_common_inputs = function_common_inputs[next_function_index] 

243 # Find the formats for the outputs. Use the first common format to do so 

244 # First select the structure format 

245 next_function_common_structure_formats = next_function_common_inputs.get('input_structure_filename', None) 

246 if next_function_common_structure_formats: 

247 output_structure_format = list(next_function_common_structure_formats)[0] 

248 # Set the output structure filename 

249 # Set the ouput as None if there is a structure with the desired format already 

250 # Otherwise, create it using an auxiliar filename 

251 already_existing_structure = next( 

252 ( structure for structure in available_structure_filenames if get_format(structure) == output_structure_format ), 

253 None 

254 ) 

255 if already_existing_structure: 

256 current_output_structure_filename = None 

257 else: 

258 auxiliar_structure_filename = '.structure.' + output_structure_format 

259 current_output_structure_filename = auxiliar_structure_filename 

260 auxiliar_filenames.append(auxiliar_structure_filename) 

261 else: 

262 current_output_structure_filename = None 

263 # Then select the trajectory format 

264 next_function_common_trajectory_formats = next_function_common_inputs.get('input_trajectory_filenames', None) 

265 if next_function_common_trajectory_formats: 

266 output_trajectory_format = list(next_function_common_trajectory_formats)[0] 

267 # Set the output trajectory filenames 

268 # Set the ouput as None if there are trajectories with the desired format already 

269 # Otherwise, create it using an auxiliar filename 

270 already_existing_trajectories = next( 

271 ( trajectories for trajectories in available_trajectory_filenames if get_format(trajectories[0]) == output_trajectory_format ), 

272 None 

273 ) 

274 if already_existing_trajectories: 

275 current_output_trajectory_filename = None 

276 else: 

277 auxiliar_trajectory_filename = '.trajectory.' + output_trajectory_format 

278 current_output_trajectory_filename = auxiliar_trajectory_filename 

279 auxiliar_filenames.append(auxiliar_trajectory_filename) 

280 else: 

281 current_output_trajectory_filename = None 

282 # In case this is the last function use the final output filenames 

283 else: 

284 current_output_structure_filename = output_structure_filename 

285 current_output_trajectory_filename = output_trajectory_filename 

286 # Set the arguments to be passed to the function 

287 # This has to be anticipated since we cannot pass an argument the function does not expect 

288 converting_function_arguments = getfullargspec(function)[0] 

289 passing_arguments = {} 

290 if 'input_structure_filename' in converting_function_arguments: 

291 passing_arguments['input_structure_filename'] = current_input_structure_filename 

292 if 'input_trajectory_filenames' in converting_function_arguments: 

293 passing_arguments['input_trajectory_filenames'] = current_input_trajectory_filenames 

294 if 'output_structure_filename' in converting_function_arguments: 

295 passing_arguments['output_structure_filename'] = current_output_structure_filename 

296 if 'output_trajectory_filename' in converting_function_arguments: 

297 passing_arguments['output_trajectory_filename'] = current_output_trajectory_filename 

298 # Excute the current function 

299 function(**passing_arguments) 

300 # Now set the inputs for the next function 

301 # Also update the available structure/trajectory files in case a further function wants to reuse them 

302 if already_existing_structure: 

303 current_input_structure_filename = already_existing_structure 

304 else: 

305 current_input_structure_filename = current_output_structure_filename 

306 available_structure_filenames.append(current_output_structure_filename) 

307 if already_existing_trajectories: 

308 current_input_trajectory_filenames = already_existing_trajectories 

309 else: 

310 current_input_trajectory_filenames = [ current_output_trajectory_filename ] 

311 available_trajectory_filenames.append([ current_output_trajectory_filename ]) 

312 # Remove auxililar files 

313 for auxiliar_filename in auxiliar_filenames: 

314 os.remove(auxiliar_filename) 

315 

316 # Set the combined function format set 

317 combined_format_set = { 

318 'inputs': first_available_inputs, 

319 'outputs': last_common_outputs 

320 } 

321 combined_function.format_sets = [combined_format_set] 

322 

323 yield combined_function, combined_format_set 

324 

325 

326# Structure file formats 

327def is_pdb (filename : str) -> bool: 

328 return filename[-4:] == '.pdb' 

329 

330def is_psf (filename : str) -> bool: 

331 return filename[-4:] == '.psf' 

332 

333def is_tpr (filename : str) -> bool: 

334 return filename[-4:] == '.tpr' 

335 

336def is_gro (filename : str) -> bool: 

337 return filename[-4:] == '.gro' 

338 

339def is_prmtop (filename : str) -> bool: 

340 return filename[-7:] == '.prmtop' 

341 

342def is_top (filename : str) -> bool: 

343 return filename[-4:] == '.top' 

344 

345# Trajectory file formats 

346 

347def is_xtc (filename : str) -> bool: 

348 return filename[-4:] == '.xtc' 

349 

350def is_dcd (filename : str) -> bool: 

351 return filename[-4:] == '.dcd' 

352 

353def is_netcdf (filename : str) -> bool: 

354 return filename[-3:] == '.nc' 

355 

356def are_xtc (filenames : list) -> bool: 

357 return all([ is_xtc(filename) for filename in filenames ]) 

358 

359def are_dcd (filenames : list) -> bool: 

360 return all([ is_dcd(filename) for filename in filenames ]) 

361 

362def are_netcdf (filenames : list) -> bool: 

363 return all([ is_netcdf(filename) for filename in filenames ]) 

364 

365# Extra formats logic 

366 

367# Check if a file may be read by pytraj according to its format 

368def is_pytraj_supported (filename : str) -> bool: 

369 return is_prmtop(filename) or is_top(filename) or is_psf(filename) 

370 

371# From GitHub: 

372# ParmFormatDict = { 

373# "AMBERPARM": AMBERPARM, 

374# "PDBFILE": PDBFILEPARM, 

375# "MOL2FILE": MOL2FILEPARM, 

376# "CHARMMPSF": CHARMMPSF, 

377# "CIFFILE": CIFFILE, 

378# "GMXTOP": GMXTOP, 

379# "SDFFILE": SDFFILE, 

380# "TINKER": TINKERPARM, 

381# "UNKNOWN_PARM": UNKNOWN_PARM, 

382# } 

383 

384def get_pytraj_parm_format (filename : str) -> str: 

385 """Get the pytraj format key for the write_parm function for a specific file according to its format.""" 

386 if is_prmtop(filename): 

387 return 'AMBERPARM' 

388 if is_psf(filename): 

389 return 'CHARMMPSF' 

390 if is_top(filename): 

391 return 'GMXTOP' 

392 if is_pdb(filename): 

393 return 'PDBFILE' 

394 raise ValueError('The file ' + filename + ' format is not supported')