Coverage for mddb_workflow / utils / tasks.py: 88%
178 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 18:45 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 18:45 +0000
1from os import rename, mkdir
2from os.path import exists
3from shutil import rmtree
4from inspect import getfullargspec
5import time
7from mddb_workflow.utils.arg_cksum import get_cksum_id
8from mddb_workflow.utils.auxiliar import safe_getattr
9from mddb_workflow.utils.constants import *
10from mddb_workflow.utils.file import File
11from mddb_workflow.utils.type_hints import *
13# Set a special exception for missing task function arguments
14# This is used for easy debug when a new functions is added wrongly
15MISSING_ARGUMENT_EXCEPTION = Exception('Missing argument')
17# Set a special exception for when a value is missing
18MISSING_VALUE_EXCEPTION = Exception('Missing value')
20# Name of the argument used by all functions to know where to write output
21OUTPUT_DIRECTORY_ARG = 'output_directory'
24class Task:
25 """Descriptor class to handle a generic task.
26 It implements lazy properties, caching and overwriting.
28 Since its properties are static, results are stored in the parent object
29 (MD/Project), or otherwise all MDs would share the same task values.
30 """
32 def __init__(self,
33 flag: str,
34 name: str,
35 func: Callable,
36 args: dict = {},
37 output_filenames: dict[str | Callable[[], str]] = {},
38 use_cache: bool = True,
39 debug: bool = False,
40 ):
41 """Initialize the Task object.
43 Args:
44 flag (str):
45 The task flag.
46 This name is used by the include/exclude/overwrite arguments and to name the analysis output directory.
48 name (str):
49 The task user-friendly name is to be used in the logs.
51 func (Callable):
52 The task function.
53 Function argument names must correspond with Project/MD property names.
55 args (dict, optional):
56 The task function additional arguments.
57 Project/MD properties are automatically sent to the function as arguments.
58 However some analyses have additional arguments (e.g. frames limit, cutoffs, etc.)
60 output_filenames (dict, optional):
61 The task output filenames.
62 Paths will be set automatically relative to its project/MD.
63 For those tasks which generate a directory with multiple outputs this is not necessary.
64 This field may be passed in addition so output files are written outside the directory.
65 This may come in handy when these output files are used later in this workflow.
67 use_cache (bool, optional):
68 Set if the returned output is to be cached.
69 Note that argument values are always cached, this is not optional.
71 debug (bool, optional):
72 If the task is run in debug mode, producing more output logs. Defaults to False.
74 """
75 # Save input arguments
76 self.flag = flag
77 self.name = name
78 self.func = func
79 self.args = args
80 self.output_filenames = output_filenames
81 self.use_cache = use_cache
82 self.debug = debug
83 # Set the key used to store and retireve data in the parent and cache
84 self.parent_output_key = f'_{self.flag}_task_output'
85 self.parent_output_file_keys = {}
86 for argument in self.output_filenames.keys():
87 self.parent_output_file_keys[argument] = f'{self.flag}_task_{argument}'
88 self.cache_arg_cksums = f'{self.flag}_task_arg_cksums'
89 self.cache_output_key = f'{self.flag}_task_output'
90 # Para el arg_cksum
91 self.__name__ = self.flag
93 # Set internal functions to handle parent saved output
94 # This output is not saved in the task itself, but in the parent, because the task is static
95 def _get_parent_output(self, parent):
96 return safe_getattr(parent, self.parent_output_key, MISSING_VALUE_EXCEPTION)
97 def _set_parent_output(self, parent, new_output):
98 return setattr(parent, self.parent_output_key, new_output)
99 def _get_parent_output_file(self, parent, argument):
100 parent_output_file_key = self.parent_output_file_keys[argument]
101 return safe_getattr(parent, parent_output_file_key, MISSING_VALUE_EXCEPTION)
102 def _set_parent_output_file(self, parent, argument, new_file: 'File'):
103 parent_output_file_key = self.parent_output_file_keys[argument]
104 return setattr(parent, parent_output_file_key, new_file)
105 def get_output(self, parent):
106 """Get the task output, running the task if necessary."""
107 # If we already have a value stored from this run then return it
108 output = self._get_parent_output(parent)
109 if output != MISSING_VALUE_EXCEPTION: return output
110 # Otherwise run the task and return the output
111 return self(parent)
112 output = property(get_output, None, None, "Task output (read only)")
113 # Asking for the an output file implies running the Task, then returning the file
114 # The argument must be specified, meaning the name of the output argument in the task function
115 def get_output_file_getter(self, argument) -> Callable:
116 def get_output_file(parent) -> 'File':
117 # If we already have a file stored from this run then return it
118 file = self._get_parent_output_file(parent, argument)
119 if file != MISSING_VALUE_EXCEPTION: return file
120 # Otherwise run the task and return the file
121 self(parent)
122 file = self._get_parent_output_file(parent, argument)
123 if file != MISSING_VALUE_EXCEPTION: return file
124 raise ValueError(f'Task {self.flag} has no output file after running')
125 return get_output_file
126 # output_file = property(get_output_file, None, None, "Task output file (read only)")
128 def __repr__(self):
129 """Represent the Task object with its flag."""
130 return f'<Task ({self.flag})>'
132 def __call__(self, parent: Union['Project', 'MD']):
133 """Call the task, running it if necessary."""
134 return self._run(parent)
136 def _run (self, parent: Union['Project', 'MD']):
137 # First of all check if this task has been already done in this very run
138 # If so then return the stored vale
139 output = self._get_parent_output(parent)
140 if output != MISSING_VALUE_EXCEPTION: return output
141 # Process the task function arguments
142 processed_args = {}
143 # Get the task function expected arguments
144 specification = getfullargspec(self.func)
145 expected_arguments = specification.args
146 n_default_arguments = len(specification.defaults) if specification.defaults else 0
147 # Find out which arguments are optional since they have default values
148 default_arguments = set(expected_arguments[::-1][:n_default_arguments])
149 # If output_filenames are among the expected arguments then set them here
150 writes_output_file = len(self.output_filenames) > 0
151 output_files = {}
152 for argument, output_filename in self.output_filenames.items():
153 # Make sure the declared output filename is expected
154 if argument not in expected_arguments:
155 raise RuntimeError(f'Unexpected argument "{argument}" in function "{self.func}"')
156 # Set the actual output filename, which may be calculated through a function
157 if type(output_filename) == str:
158 output_filepath = parent.pathify(output_filename)
159 elif callable(output_filename):
160 # If it is a function then it must be fed with the parent
161 # DANI: Solo hay un caso para esto: el topology_filepath del inpro
162 # DANI: En este caso la topology ya viene con el path relativo a proyecto
163 # DANI: No hay que hacerlo relativo a MD
164 output_filepath = output_filename(parent)
165 else: raise RuntimeError(f'Unexpected output filename type "{type(output_filename)}"')
166 # Set the output file
167 if type(output_filepath) == str:
168 output_file = File(output_filepath)
169 elif type(output_filepath) == Exception:
170 output_file = output_filepath
171 else: raise RuntimeError(f'Unexpected output filepath type "{type(output_filepath)}"')
172 output_files[argument] = output_file
173 self._set_parent_output_file(parent, argument, output_file)
174 # Add it to the processed args
175 processed_args[argument] = output_file
176 # Remove the expected argument from the list
177 expected_arguments.remove(argument)
178 # If one of the expected arguments is the output_directory then set it here
179 # We will set a new directory with the flag name of the task, in the correspoding path
180 # Note that while the task is beeing done the output directory has a different name
181 # Thus the directory is hidden and marked as incomplete
182 # The final output directory is the one without the incomplete prefix
183 writes_output_dir = OUTPUT_DIRECTORY_ARG in expected_arguments
184 incomplete_output_directory = None
185 final_output_directory = None
186 if writes_output_dir:
187 # Set the output directory path
188 incomplete_output_directory = parent.pathify(INCOMPLETE_PREFIX + self.flag)
189 final_output_directory = parent.pathify(self.flag)
190 # Add it to the processed args
191 processed_args[OUTPUT_DIRECTORY_ARG] = incomplete_output_directory
192 # Remove the expected argument from the list
193 expected_arguments.remove(OUTPUT_DIRECTORY_ARG)
194 # Iterate the reamining expected arguments
195 for arg in expected_arguments:
196 # First find the argument among the parent properties
197 arg_value = self._find_arg_value(arg, parent, default_arguments)
198 if arg_value == MISSING_ARGUMENT_EXCEPTION: continue
199 # Add the processed argument
200 processed_args[arg] = arg_value
201 # Check again if the task has output already
202 # It may happend that some dependencies assign output on their own
203 # e.g. charges, bonds
204 # If so then return the stored vale
205 output = self._get_parent_output(parent)
206 if output != MISSING_VALUE_EXCEPTION: return output
207 # Find if we have cached output
208 if self.use_cache:
209 output = parent.cache.retrieve(self.cache_output_key, MISSING_VALUE_EXCEPTION)
210 self._set_parent_output(parent, output)
211 # Check if this dependency is to be overwriten
212 forced_overwrite = self.flag in parent.overwritables
213 changed_inputs, had_cache, cache_cksums = self._get_changed_inputs(parent, processed_args)
214 any_input_changed = len(changed_inputs) > 0
215 # Update the cache inputs
216 parent.cache.update(self.cache_arg_cksums, cache_cksums)
217 # We must overwrite outputs either if inputs changed or if it was forced by the user
218 must_overwrite = forced_overwrite or any_input_changed
219 # Check if output already exists
220 # If the final directory already exists then it means the task was started in a previous run
221 existing_incomplete_output = writes_output_dir and exists(incomplete_output_directory)
222 # If the final directory already exists then it means the task was done in a previous run
223 existing_final_output = writes_output_dir and exists(final_output_directory)
224 # If the output file already exists then it also means the task was done in a previous run
225 existing_output_files = writes_output_file and \
226 all( output_file == None or type(output_file) == Exception or output_file.exists
227 for output_file in output_files.values() )
228 # If we already have a cached output result
229 existing_output_data = output != MISSING_VALUE_EXCEPTION
230 # If we must overwrite then purge previous outputs
231 if must_overwrite:
232 if existing_incomplete_output: rmtree(incomplete_output_directory)
233 if existing_final_output: rmtree(final_output_directory)
234 if existing_output_files:
235 for output_file in output_files.values():
236 if output_file == None or type(output_file) == Exception: continue
237 if output_file.exists: output_file.remove()
238 if existing_output_data: parent.cache.delete(self.cache_output_key)
239 # If already existing output is not to be overwritten then check if it is already what we need
240 else:
241 # If output files/directories are expected then they must exist
242 # If output data is expected then it must be cached
243 satisfied_output = (not writes_output_dir or existing_final_output) \
244 and (not writes_output_file or existing_output_files) \
245 and existing_output_data
246 if self.debug:
247 print(f'existing_output_data: {existing_output_data}')
248 print(f'output: {output}')
249 # If we already have the expected output then we can skip the task at all
250 if satisfied_output:
251 print(f'{GREY_HEADER}-> Task {self.flag} ({self.name}) already completed{COLOR_END}')
252 return output
253 # If we are at this point then we are missing some output so we must proceed to run the task
254 # Use the final output directory instead of the incomplete one if exists
255 # Note that we must check if it exists again since it may have been deleted since the last check
256 if writes_output_dir and exists(final_output_directory):
257 processed_args[OUTPUT_DIRECTORY_ARG] = final_output_directory
258 # Create the incomplete output directory, if necessary
259 missing_incomplete_output = writes_output_dir \
260 and not exists(incomplete_output_directory) \
261 and not exists(final_output_directory)
262 if missing_incomplete_output: mkdir(incomplete_output_directory)
263 # Finally call the function
264 print(f'{GREEN_HEADER}-> Running task {self.flag} ({self.name}){COLOR_END}')
265 start_time = time.time()
266 # If the task is to be run again because an inputs changed then let the user know
267 if any_input_changed and had_cache and not forced_overwrite:
268 changes = ''.join([ '\n - ' + inp for inp in changed_inputs ])
269 print(f'{GREEN_HEADER} The task is run again since the following inputs changed:{changes}{COLOR_END}')
270 # Save a few internal values the task although the task is static
271 # We save it right before calling the function in case the function uses this task as input
272 self.changed_inputs = changed_inputs
273 self.cache_cksums = cache_cksums
274 # Run the actual task
275 output = self.func(**processed_args)
276 end_time = time.time()
277 print(f' Task {self.flag} completed in {end_time - start_time:.2f} seconds{COLOR_END}')
278 self._set_parent_output(parent, output)
279 if self.use_cache:
280 # Save output in cache unless it is marked to not save it
281 # Note that all must be JSON serializable values
282 parent.cache.update(self.cache_output_key, output)
283 # Update the overwritables so this is not remade further in the same run
284 parent.overwritables.discard(self.flag)
285 # Change the incomplete directory name to its final name
286 # We do not remove the directory if it is empty anymore
287 # The empty directory stands as a proof that the task was run successfully
288 # Thus its existance prevents the task to be run again further
289 # Note that some tasks clean their own intermediate steps to save disk (e.g. inpro)
290 if writes_output_dir and exists(incomplete_output_directory):
291 rename(incomplete_output_directory, final_output_directory)
292 return output
294 def _find_arg_value(self, arg: str, parent: Union['Project', 'MD'], default_arguments: set):
295 """Find argument values, thus running any dependency if necessary."""
296 # Word 'task' is reserved for getting the task itself
297 if arg == 'task': return self
298 # Word 'self' is reserved for getting the caller Project/MD
299 if arg == 'self': return parent
300 # Check if the argument is an MD property
301 arg_value = safe_getattr(parent, arg, MISSING_ARGUMENT_EXCEPTION)
302 if arg_value != MISSING_ARGUMENT_EXCEPTION: return arg_value
303 # If the parent is an MD then it may happen the property is from the Project
304 # We can not use the 'isinstance' function here because we can not import the MD class
305 if parent.__class__.__name__ == 'MD':
306 arg_value = safe_getattr(parent.project, arg, MISSING_ARGUMENT_EXCEPTION)
307 if arg_value != MISSING_ARGUMENT_EXCEPTION: return arg_value
308 # If the property is missing then search among the additional arguments
309 arg_value = self.args.get(arg, MISSING_ARGUMENT_EXCEPTION)
310 if arg_value != MISSING_ARGUMENT_EXCEPTION: return arg_value
311 # It may also happen that the argument has a default value
312 # If this is the case then we can skip it
313 if arg in default_arguments: return MISSING_ARGUMENT_EXCEPTION
314 # NEVER FORGET: Function arguments must have the same name that the Project/MD property
315 # If the argument is still missing then you programmed the function wrongly or...
316 # You may have forgotten the additional argument in the task args
317 raise RuntimeError(f'Function "{self.func.__name__}" from task "{self.flag}" expects argument "{arg}" but it is missing')
319 def _get_changed_inputs(self,
320 parent: Union['Project', 'MD'],
321 processed_args: dict,
322 ) -> tuple[list[str], bool, dict]:
323 """Find out if input arguments changed regarding the last run.
325 Get the list of inputs which have changed compared to a previous run.
326 If had_cache is false then it means this is the first time the task is ever done.
328 Warning:
329 Always get changed inputs, since this function updates the cache.
331 """
332 # Get cache argument references
333 cache_cksums = parent.cache.retrieve(self.cache_arg_cksums, MISSING_VALUE_EXCEPTION)
334 had_cache = False if cache_cksums == MISSING_VALUE_EXCEPTION else True
335 if cache_cksums == MISSING_VALUE_EXCEPTION: cache_cksums = {}
336 # Check argument by argument
337 # Keep a list with arguments which have changed
338 unmatched_arguments = []
339 for arg_name, arg_value in processed_args.items():
340 # Skip the output directory argument and an output filename arguments
341 # Changes in these arguments are not actual "input" changes
342 if arg_name == OUTPUT_DIRECTORY_ARG: continue
343 if arg_name in self.output_filenames: continue
344 # Get the cksum from the new argument value
345 new_cksum = get_cksum_id(arg_value)
346 # Retrieve the cksum from the old argument value
347 old_cksum = cache_cksums.get(arg_name, None)
348 if self.debug: print(f'Task "{self.name}" -> argument "{arg_name}"\n' +
349 f' new value: {arg_value}\n' +
350 f' new value cksum: {new_cksum}\n' +
351 f' old value cksum: {old_cksum}\n' +
352 f' match: {new_cksum == old_cksum}')
353 # Compare new and old cksums
354 if new_cksum != old_cksum:
355 # If we found a missmatch then add it to the list
356 unmatched_arguments.append(arg_name)
357 # Update the references
358 cache_cksums[arg_name] = new_cksum
359 return unmatched_arguments, had_cache, cache_cksums
361 def prefill(self, parent: Union['Project', 'MD'], output, inputs):
362 """Assign an output value to a task thus marking it as already run.
363 This function is used in a very specific scenario:
364 Tasks which generate no output files/directory inside the 'inpro' task.
365 """
366 # Save the task output in the parent instance
367 self._set_parent_output(parent, output)
368 # Update cache output unless it is marked to not save it
369 if self.use_cache: parent.cache.update(self.cache_output_key, output)
370 # Save input cksums to avoid repeating this task in future runs
371 _, _, cache_cksums = self._get_changed_inputs(parent, inputs)
372 parent.cache.update(self.cache_arg_cksums, cache_cksums)