Coverage for mddb_workflow / utils / tasks.py: 88%

178 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 18:45 +0000

1from os import rename, mkdir 

2from os.path import exists 

3from shutil import rmtree 

4from inspect import getfullargspec 

5import time 

6 

7from mddb_workflow.utils.arg_cksum import get_cksum_id 

8from mddb_workflow.utils.auxiliar import safe_getattr 

9from mddb_workflow.utils.constants import * 

10from mddb_workflow.utils.file import File 

11from mddb_workflow.utils.type_hints import * 

12 

13# Set a special exception for missing task function arguments 

14# This is used for easy debug when a new functions is added wrongly 

15MISSING_ARGUMENT_EXCEPTION = Exception('Missing argument') 

16 

17# Set a special exception for when a value is missing 

18MISSING_VALUE_EXCEPTION = Exception('Missing value') 

19 

20# Name of the argument used by all functions to know where to write output 

21OUTPUT_DIRECTORY_ARG = 'output_directory' 

22 

23 

24class Task: 

25 """Descriptor class to handle a generic task. 

26 It implements lazy properties, caching and overwriting. 

27 

28 Since its properties are static, results are stored in the parent object 

29 (MD/Project), or otherwise all MDs would share the same task values. 

30 """ 

31 

32 def __init__(self, 

33 flag: str, 

34 name: str, 

35 func: Callable, 

36 args: dict = {}, 

37 output_filenames: dict[str | Callable[[], str]] = {}, 

38 use_cache: bool = True, 

39 debug: bool = False, 

40 ): 

41 """Initialize the Task object. 

42 

43 Args: 

44 flag (str): 

45 The task flag. 

46 This name is used by the include/exclude/overwrite arguments and to name the analysis output directory. 

47 

48 name (str): 

49 The task user-friendly name is to be used in the logs. 

50 

51 func (Callable): 

52 The task function. 

53 Function argument names must correspond with Project/MD property names. 

54 

55 args (dict, optional): 

56 The task function additional arguments. 

57 Project/MD properties are automatically sent to the function as arguments. 

58 However some analyses have additional arguments (e.g. frames limit, cutoffs, etc.) 

59 

60 output_filenames (dict, optional): 

61 The task output filenames. 

62 Paths will be set automatically relative to its project/MD. 

63 For those tasks which generate a directory with multiple outputs this is not necessary. 

64 This field may be passed in addition so output files are written outside the directory. 

65 This may come in handy when these output files are used later in this workflow. 

66 

67 use_cache (bool, optional): 

68 Set if the returned output is to be cached. 

69 Note that argument values are always cached, this is not optional. 

70 

71 debug (bool, optional): 

72 If the task is run in debug mode, producing more output logs. Defaults to False. 

73 

74 """ 

75 # Save input arguments 

76 self.flag = flag 

77 self.name = name 

78 self.func = func 

79 self.args = args 

80 self.output_filenames = output_filenames 

81 self.use_cache = use_cache 

82 self.debug = debug 

83 # Set the key used to store and retireve data in the parent and cache 

84 self.parent_output_key = f'_{self.flag}_task_output' 

85 self.parent_output_file_keys = {} 

86 for argument in self.output_filenames.keys(): 

87 self.parent_output_file_keys[argument] = f'{self.flag}_task_{argument}' 

88 self.cache_arg_cksums = f'{self.flag}_task_arg_cksums' 

89 self.cache_output_key = f'{self.flag}_task_output' 

90 # Para el arg_cksum 

91 self.__name__ = self.flag 

92 

93 # Set internal functions to handle parent saved output 

94 # This output is not saved in the task itself, but in the parent, because the task is static 

95 def _get_parent_output(self, parent): 

96 return safe_getattr(parent, self.parent_output_key, MISSING_VALUE_EXCEPTION) 

97 def _set_parent_output(self, parent, new_output): 

98 return setattr(parent, self.parent_output_key, new_output) 

99 def _get_parent_output_file(self, parent, argument): 

100 parent_output_file_key = self.parent_output_file_keys[argument] 

101 return safe_getattr(parent, parent_output_file_key, MISSING_VALUE_EXCEPTION) 

102 def _set_parent_output_file(self, parent, argument, new_file: 'File'): 

103 parent_output_file_key = self.parent_output_file_keys[argument] 

104 return setattr(parent, parent_output_file_key, new_file) 

105 def get_output(self, parent): 

106 """Get the task output, running the task if necessary.""" 

107 # If we already have a value stored from this run then return it 

108 output = self._get_parent_output(parent) 

109 if output != MISSING_VALUE_EXCEPTION: return output 

110 # Otherwise run the task and return the output 

111 return self(parent) 

112 output = property(get_output, None, None, "Task output (read only)") 

113 # Asking for the an output file implies running the Task, then returning the file 

114 # The argument must be specified, meaning the name of the output argument in the task function 

115 def get_output_file_getter(self, argument) -> Callable: 

116 def get_output_file(parent) -> 'File': 

117 # If we already have a file stored from this run then return it 

118 file = self._get_parent_output_file(parent, argument) 

119 if file != MISSING_VALUE_EXCEPTION: return file 

120 # Otherwise run the task and return the file 

121 self(parent) 

122 file = self._get_parent_output_file(parent, argument) 

123 if file != MISSING_VALUE_EXCEPTION: return file 

124 raise ValueError(f'Task {self.flag} has no output file after running') 

125 return get_output_file 

126 # output_file = property(get_output_file, None, None, "Task output file (read only)") 

127 

128 def __repr__(self): 

129 """Represent the Task object with its flag.""" 

130 return f'<Task ({self.flag})>' 

131 

132 def __call__(self, parent: Union['Project', 'MD']): 

133 """Call the task, running it if necessary.""" 

134 return self._run(parent) 

135 

136 def _run (self, parent: Union['Project', 'MD']): 

137 # First of all check if this task has been already done in this very run 

138 # If so then return the stored vale 

139 output = self._get_parent_output(parent) 

140 if output != MISSING_VALUE_EXCEPTION: return output 

141 # Process the task function arguments 

142 processed_args = {} 

143 # Get the task function expected arguments 

144 specification = getfullargspec(self.func) 

145 expected_arguments = specification.args 

146 n_default_arguments = len(specification.defaults) if specification.defaults else 0 

147 # Find out which arguments are optional since they have default values 

148 default_arguments = set(expected_arguments[::-1][:n_default_arguments]) 

149 # If output_filenames are among the expected arguments then set them here 

150 writes_output_file = len(self.output_filenames) > 0 

151 output_files = {} 

152 for argument, output_filename in self.output_filenames.items(): 

153 # Make sure the declared output filename is expected 

154 if argument not in expected_arguments: 

155 raise RuntimeError(f'Unexpected argument "{argument}" in function "{self.func}"') 

156 # Set the actual output filename, which may be calculated through a function 

157 if type(output_filename) == str: 

158 output_filepath = parent.pathify(output_filename) 

159 elif callable(output_filename): 

160 # If it is a function then it must be fed with the parent 

161 # DANI: Solo hay un caso para esto: el topology_filepath del inpro 

162 # DANI: En este caso la topology ya viene con el path relativo a proyecto 

163 # DANI: No hay que hacerlo relativo a MD 

164 output_filepath = output_filename(parent) 

165 else: raise RuntimeError(f'Unexpected output filename type "{type(output_filename)}"') 

166 # Set the output file 

167 if type(output_filepath) == str: 

168 output_file = File(output_filepath) 

169 elif type(output_filepath) == Exception: 

170 output_file = output_filepath 

171 else: raise RuntimeError(f'Unexpected output filepath type "{type(output_filepath)}"') 

172 output_files[argument] = output_file 

173 self._set_parent_output_file(parent, argument, output_file) 

174 # Add it to the processed args 

175 processed_args[argument] = output_file 

176 # Remove the expected argument from the list 

177 expected_arguments.remove(argument) 

178 # If one of the expected arguments is the output_directory then set it here 

179 # We will set a new directory with the flag name of the task, in the correspoding path 

180 # Note that while the task is beeing done the output directory has a different name 

181 # Thus the directory is hidden and marked as incomplete 

182 # The final output directory is the one without the incomplete prefix 

183 writes_output_dir = OUTPUT_DIRECTORY_ARG in expected_arguments 

184 incomplete_output_directory = None 

185 final_output_directory = None 

186 if writes_output_dir: 

187 # Set the output directory path 

188 incomplete_output_directory = parent.pathify(INCOMPLETE_PREFIX + self.flag) 

189 final_output_directory = parent.pathify(self.flag) 

190 # Add it to the processed args 

191 processed_args[OUTPUT_DIRECTORY_ARG] = incomplete_output_directory 

192 # Remove the expected argument from the list 

193 expected_arguments.remove(OUTPUT_DIRECTORY_ARG) 

194 # Iterate the reamining expected arguments 

195 for arg in expected_arguments: 

196 # First find the argument among the parent properties 

197 arg_value = self._find_arg_value(arg, parent, default_arguments) 

198 if arg_value == MISSING_ARGUMENT_EXCEPTION: continue 

199 # Add the processed argument 

200 processed_args[arg] = arg_value 

201 # Check again if the task has output already 

202 # It may happend that some dependencies assign output on their own 

203 # e.g. charges, bonds 

204 # If so then return the stored vale 

205 output = self._get_parent_output(parent) 

206 if output != MISSING_VALUE_EXCEPTION: return output 

207 # Find if we have cached output 

208 if self.use_cache: 

209 output = parent.cache.retrieve(self.cache_output_key, MISSING_VALUE_EXCEPTION) 

210 self._set_parent_output(parent, output) 

211 # Check if this dependency is to be overwriten 

212 forced_overwrite = self.flag in parent.overwritables 

213 changed_inputs, had_cache, cache_cksums = self._get_changed_inputs(parent, processed_args) 

214 any_input_changed = len(changed_inputs) > 0 

215 # Update the cache inputs 

216 parent.cache.update(self.cache_arg_cksums, cache_cksums) 

217 # We must overwrite outputs either if inputs changed or if it was forced by the user 

218 must_overwrite = forced_overwrite or any_input_changed 

219 # Check if output already exists 

220 # If the final directory already exists then it means the task was started in a previous run 

221 existing_incomplete_output = writes_output_dir and exists(incomplete_output_directory) 

222 # If the final directory already exists then it means the task was done in a previous run 

223 existing_final_output = writes_output_dir and exists(final_output_directory) 

224 # If the output file already exists then it also means the task was done in a previous run 

225 existing_output_files = writes_output_file and \ 

226 all( output_file == None or type(output_file) == Exception or output_file.exists 

227 for output_file in output_files.values() ) 

228 # If we already have a cached output result 

229 existing_output_data = output != MISSING_VALUE_EXCEPTION 

230 # If we must overwrite then purge previous outputs 

231 if must_overwrite: 

232 if existing_incomplete_output: rmtree(incomplete_output_directory) 

233 if existing_final_output: rmtree(final_output_directory) 

234 if existing_output_files: 

235 for output_file in output_files.values(): 

236 if output_file == None or type(output_file) == Exception: continue 

237 if output_file.exists: output_file.remove() 

238 if existing_output_data: parent.cache.delete(self.cache_output_key) 

239 # If already existing output is not to be overwritten then check if it is already what we need 

240 else: 

241 # If output files/directories are expected then they must exist 

242 # If output data is expected then it must be cached 

243 satisfied_output = (not writes_output_dir or existing_final_output) \ 

244 and (not writes_output_file or existing_output_files) \ 

245 and existing_output_data 

246 if self.debug: 

247 print(f'existing_output_data: {existing_output_data}') 

248 print(f'output: {output}') 

249 # If we already have the expected output then we can skip the task at all 

250 if satisfied_output: 

251 print(f'{GREY_HEADER}-> Task {self.flag} ({self.name}) already completed{COLOR_END}') 

252 return output 

253 # If we are at this point then we are missing some output so we must proceed to run the task 

254 # Use the final output directory instead of the incomplete one if exists 

255 # Note that we must check if it exists again since it may have been deleted since the last check 

256 if writes_output_dir and exists(final_output_directory): 

257 processed_args[OUTPUT_DIRECTORY_ARG] = final_output_directory 

258 # Create the incomplete output directory, if necessary 

259 missing_incomplete_output = writes_output_dir \ 

260 and not exists(incomplete_output_directory) \ 

261 and not exists(final_output_directory) 

262 if missing_incomplete_output: mkdir(incomplete_output_directory) 

263 # Finally call the function 

264 print(f'{GREEN_HEADER}-> Running task {self.flag} ({self.name}){COLOR_END}') 

265 start_time = time.time() 

266 # If the task is to be run again because an inputs changed then let the user know 

267 if any_input_changed and had_cache and not forced_overwrite: 

268 changes = ''.join([ '\n - ' + inp for inp in changed_inputs ]) 

269 print(f'{GREEN_HEADER} The task is run again since the following inputs changed:{changes}{COLOR_END}') 

270 # Save a few internal values the task although the task is static 

271 # We save it right before calling the function in case the function uses this task as input 

272 self.changed_inputs = changed_inputs 

273 self.cache_cksums = cache_cksums 

274 # Run the actual task 

275 output = self.func(**processed_args) 

276 end_time = time.time() 

277 print(f' Task {self.flag} completed in {end_time - start_time:.2f} seconds{COLOR_END}') 

278 self._set_parent_output(parent, output) 

279 if self.use_cache: 

280 # Save output in cache unless it is marked to not save it 

281 # Note that all must be JSON serializable values 

282 parent.cache.update(self.cache_output_key, output) 

283 # Update the overwritables so this is not remade further in the same run 

284 parent.overwritables.discard(self.flag) 

285 # Change the incomplete directory name to its final name 

286 # We do not remove the directory if it is empty anymore 

287 # The empty directory stands as a proof that the task was run successfully 

288 # Thus its existance prevents the task to be run again further 

289 # Note that some tasks clean their own intermediate steps to save disk (e.g. inpro) 

290 if writes_output_dir and exists(incomplete_output_directory): 

291 rename(incomplete_output_directory, final_output_directory) 

292 return output 

293 

294 def _find_arg_value(self, arg: str, parent: Union['Project', 'MD'], default_arguments: set): 

295 """Find argument values, thus running any dependency if necessary.""" 

296 # Word 'task' is reserved for getting the task itself 

297 if arg == 'task': return self 

298 # Word 'self' is reserved for getting the caller Project/MD 

299 if arg == 'self': return parent 

300 # Check if the argument is an MD property 

301 arg_value = safe_getattr(parent, arg, MISSING_ARGUMENT_EXCEPTION) 

302 if arg_value != MISSING_ARGUMENT_EXCEPTION: return arg_value 

303 # If the parent is an MD then it may happen the property is from the Project 

304 # We can not use the 'isinstance' function here because we can not import the MD class 

305 if parent.__class__.__name__ == 'MD': 

306 arg_value = safe_getattr(parent.project, arg, MISSING_ARGUMENT_EXCEPTION) 

307 if arg_value != MISSING_ARGUMENT_EXCEPTION: return arg_value 

308 # If the property is missing then search among the additional arguments 

309 arg_value = self.args.get(arg, MISSING_ARGUMENT_EXCEPTION) 

310 if arg_value != MISSING_ARGUMENT_EXCEPTION: return arg_value 

311 # It may also happen that the argument has a default value 

312 # If this is the case then we can skip it 

313 if arg in default_arguments: return MISSING_ARGUMENT_EXCEPTION 

314 # NEVER FORGET: Function arguments must have the same name that the Project/MD property 

315 # If the argument is still missing then you programmed the function wrongly or... 

316 # You may have forgotten the additional argument in the task args 

317 raise RuntimeError(f'Function "{self.func.__name__}" from task "{self.flag}" expects argument "{arg}" but it is missing') 

318 

319 def _get_changed_inputs(self, 

320 parent: Union['Project', 'MD'], 

321 processed_args: dict, 

322 ) -> tuple[list[str], bool, dict]: 

323 """Find out if input arguments changed regarding the last run. 

324 

325 Get the list of inputs which have changed compared to a previous run. 

326 If had_cache is false then it means this is the first time the task is ever done. 

327 

328 Warning: 

329 Always get changed inputs, since this function updates the cache. 

330 

331 """ 

332 # Get cache argument references 

333 cache_cksums = parent.cache.retrieve(self.cache_arg_cksums, MISSING_VALUE_EXCEPTION) 

334 had_cache = False if cache_cksums == MISSING_VALUE_EXCEPTION else True 

335 if cache_cksums == MISSING_VALUE_EXCEPTION: cache_cksums = {} 

336 # Check argument by argument 

337 # Keep a list with arguments which have changed 

338 unmatched_arguments = [] 

339 for arg_name, arg_value in processed_args.items(): 

340 # Skip the output directory argument and an output filename arguments 

341 # Changes in these arguments are not actual "input" changes 

342 if arg_name == OUTPUT_DIRECTORY_ARG: continue 

343 if arg_name in self.output_filenames: continue 

344 # Get the cksum from the new argument value 

345 new_cksum = get_cksum_id(arg_value) 

346 # Retrieve the cksum from the old argument value 

347 old_cksum = cache_cksums.get(arg_name, None) 

348 if self.debug: print(f'Task "{self.name}" -> argument "{arg_name}"\n' + 

349 f' new value: {arg_value}\n' + 

350 f' new value cksum: {new_cksum}\n' + 

351 f' old value cksum: {old_cksum}\n' + 

352 f' match: {new_cksum == old_cksum}') 

353 # Compare new and old cksums 

354 if new_cksum != old_cksum: 

355 # If we found a missmatch then add it to the list 

356 unmatched_arguments.append(arg_name) 

357 # Update the references 

358 cache_cksums[arg_name] = new_cksum 

359 return unmatched_arguments, had_cache, cache_cksums 

360 

361 def prefill(self, parent: Union['Project', 'MD'], output, inputs): 

362 """Assign an output value to a task thus marking it as already run. 

363 This function is used in a very specific scenario: 

364 Tasks which generate no output files/directory inside the 'inpro' task. 

365 """ 

366 # Save the task output in the parent instance 

367 self._set_parent_output(parent, output) 

368 # Update cache output unless it is marked to not save it 

369 if self.use_cache: parent.cache.update(self.cache_output_key, output) 

370 # Save input cksums to avoid repeating this task in future runs 

371 _, _, cache_cksums = self._get_changed_inputs(parent, inputs) 

372 parent.cache.update(self.cache_arg_cksums, cache_cksums)