Coverage for model_workflow/mwf.py: 80%
1215 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
1#!/usr/bin/env python
3# This is the starter script
5# Import python libraries
6from os import chdir, rename, remove, walk, mkdir, getcwd
7from os.path import exists, isdir, isabs, relpath
8from shutil import rmtree
9import sys
10import io
11import re
12import numpy
13from glob import glob
14from inspect import getfullargspec
16# Constants
17from model_workflow.utils.constants import *
19# Import local utils
20# Importing constants first is important
21from model_workflow.utils.constants import *
22#from model_workflow.utils.httpsf import mount
23from model_workflow.utils.auxiliar import InputError, MISSING_TOPOLOGY
24from model_workflow.utils.auxiliar import warn, load_json, load_yaml, list_files, is_directory_empty
25from model_workflow.utils.auxiliar import is_glob, parse_glob, safe_getattr
26from model_workflow.utils.arg_cksum import get_cksum_id
27from model_workflow.utils.register import Register
28from model_workflow.utils.cache import Cache
29from model_workflow.utils.structures import Structure
30from model_workflow.utils.topologies import Topology
31from model_workflow.utils.file import File
32from model_workflow.utils.remote import Remote
33from model_workflow.utils.pyt_spells import get_frames_count, get_average_structure
34from model_workflow.utils.selections import Selection
35from model_workflow.utils.mda_spells import get_mda_universe
36from model_workflow.utils.type_hints import *
38# Import local tools
39from model_workflow.tools.get_first_frame import get_first_frame
40from model_workflow.tools.get_bonds import find_safe_bonds, get_bonds_canonical_frame
41from model_workflow.tools.process_interactions import process_interactions
42from model_workflow.tools.find_interaction_types import find_interaction_types
43from model_workflow.tools.generate_metadata import prepare_project_metadata, generate_md_metadata
44from model_workflow.tools.generate_ligands_desc import generate_ligand_mapping
45from model_workflow.tools.chains import prepare_chain_references
46from model_workflow.tools.generate_pdb_references import prepare_pdb_references
47from model_workflow.tools.residue_mapping import generate_residue_mapping
48from model_workflow.tools.generate_map import generate_protein_mapping
49from model_workflow.tools.generate_lipid_references import generate_lipid_references
50from model_workflow.tools.generate_membrane_mapping import generate_membrane_mapping
51from model_workflow.tools.generate_topology import generate_topology
52from model_workflow.tools.get_charges import get_charges
53from model_workflow.tools.remove_trash import remove_trash
54from model_workflow.tools.get_screenshot import get_screenshot
55from model_workflow.tools.process_input_files import process_input_files
57# Import local analyses
58from model_workflow.analyses.rmsds import rmsds
59from model_workflow.analyses.tmscores import tmscores
60from model_workflow.analyses.rmsf import rmsf
61from model_workflow.analyses.rgyr import rgyr
62from model_workflow.analyses.pca import pca
63from model_workflow.analyses.density import density
64from model_workflow.analyses.thickness import thickness
65from model_workflow.analyses.area_per_lipid import area_per_lipid
66from model_workflow.analyses.lipid_order import lipid_order
67from model_workflow.analyses.lipid_interactions import lipid_interactions
68#from model_workflow.analyses.pca_contacts import pca_contacts
69from model_workflow.analyses.rmsd_per_residue import rmsd_per_residue
70from model_workflow.analyses.rmsd_pairwise import rmsd_pairwise
71from model_workflow.analyses.clusters import clusters_analysis
72from model_workflow.analyses.distance_per_residue import distance_per_residue
73from model_workflow.analyses.hydrogen_bonds import hydrogen_bonds
74from model_workflow.analyses.sasa import sasa
75from model_workflow.analyses.energies import energies
76from model_workflow.analyses.dihedral_energies import compute_dihedral_energies
77from model_workflow.analyses.pockets import pockets
78from model_workflow.analyses.rmsd_check import check_trajectory_integrity
79from model_workflow.analyses.helical_parameters import helical_parameters
80from model_workflow.analyses.markov import markov
82# Make the system output stream to not be buffered
83# Only when not in a Jupyter notebook or using pytest
84# Check if we're in an interactive Python shell like Jupyter
85if not hasattr(sys, 'ps1') and not 'pytest' in sys.modules:
86 # This is useful to make prints work on time in Slurm
87 # Otherwise, output logs are written after the script has fully run
88 # Note that this fix affects all modules and built-ins
89 unbuffered = io.TextIOWrapper(open(sys.stdout.fileno(), 'wb', 0), write_through=True)
90 sys.stdout = unbuffered
92# Set a special exception for missing inputs
93MISSING_INPUT_EXCEPTION = Exception('Missing input')
95# Set a special exception for missing task function arguments
96# This is used for easy debug when a new functions is added wrongly
97MISSING_ARGUMENT_EXCEPTION = Exception('Missing argument')
99# Set a special exception for when a value is missing
100MISSING_VALUE_EXCEPTION = Exception('Missing value')
102# Name of the argument used by all functions to know where to write output
103OUTPUT_FILEPATH_ARG = 'output_filepath'
104OUTPUT_DIRECTORY_ARG = 'output_directory'
106# Set some variables which are filled at the end but are referred by previously defined functions
107requestables = {}
108inverted_requestables = {}
110# Set a class to handle a generic task
111# WARNING: Note that a task is static in relation to its Project/MD
112# This means that all MDs share the same task and thus it can not store internal values
113# Instead all its internal vallues are stored in the parent using a key name
114# This is not easy to change with the curent implementation
115class Task:
116 def __init__ (self,
117 # The task flag
118 # This name is to be used by the include/exclude/overwrite arguments
119 # It will also name the folder containing all analysis output
120 flag : str,
121 # The task nice name
122 # This user-firendly name is to be used in the logs
123 name : str,
124 # The task function
125 # Function argument names must correspond with Project/MD property names
126 func : Callable,
127 # The task function "additional" inputs
128 # Project/MD properties are automatically sent to the function as arguments
129 # However some analyses have additional arguments (e.g. frames limit, cutoffs, etc.)
130 args : dict = {},
131 # In case this task is to produce an output file, set here its name
132 # The actual path relative to its project/MD will be set automatically
133 # For those tasks which generate a directory with multiple outputs this is not necessary
134 # However this may come in handy by tasks with a single file output
135 # Specillay when this output file is used later in this workflow
136 output_filename : Optional[str] = None,
137 # Set if the returned output is to be cached
138 # Note that argument values are always cached, this is not optional
139 use_cache : bool = True
140 ):
141 # Save input arguments
142 self.flag = flag
143 self.name = name
144 self.func = func
145 self.args = args
146 self.output_filename = output_filename
147 self.use_cache = use_cache
148 # Set the key used to store and retireve data in the parent and cache
149 self.parent_output_key = f'_{self.flag}_task_output'
150 self.parent_output_filepath_key = f'{self.flag}_task_output_filepath'
151 self.cache_output_key = f'{self.flag}_task_output'
152 self.cache_arg_cksums = f'{self.flag}_task_arg_cksums'
153 # Para el arg_cksum
154 self.__name__ = self.flag
156 # Set internal functions to handle parent saved output
157 # This output is not saved in the task itself, but in the parent, because the task is static
158 def _get_parent_output (self, parent):
159 return safe_getattr(parent, self.parent_output_key, MISSING_VALUE_EXCEPTION)
160 def _set_parent_output (self, parent, new_output):
161 return setattr(parent, self.parent_output_key, new_output)
162 def _get_parent_output_filepath (self, parent):
163 return safe_getattr(parent, self.parent_output_filepath_key, MISSING_VALUE_EXCEPTION)
164 def _set_parent_output_filepath (self, parent, new_filepath):
165 return setattr(parent, self.parent_output_filepath_key, new_filepath)
166 # Get the task output, running the task if necessary
167 def get_output (self, parent):
168 # If we already have a value stored from this run then return it
169 output = self._get_parent_output(parent)
170 if output != MISSING_VALUE_EXCEPTION: return output
171 # Otherwise run the task and return the output
172 return self(parent)
173 output = property(get_output, None, None, "Task output (read only)")
174 # Asking for the output file or filepath implies running the Task, then returning the file/filepath
175 def get_output_filepath (self, parent) -> str:
176 # If we already have a filepath stored from this run then return it
177 filepath = self._get_parent_output_filepath(parent)
178 if filepath != MISSING_VALUE_EXCEPTION: return filepath
179 # Otherwise run the task and return the filepath
180 self(parent)
181 filepath = self._get_parent_output_filepath(parent)
182 if filepath != MISSING_VALUE_EXCEPTION: return filepath
183 raise ValueError(f'Task {self.flag} has no output filepath after running')
184 output_filepath = property(get_output_filepath, None, None, "Task output filepath (read only)")
185 def get_output_file (self, parent) -> str:
186 filepath = self.get_output_filepath(parent)
187 return File(filepath)
188 output_file = property(get_output_file, None, None, "Task output file (read only)")
190 # When the task is printed, show the flag
191 def __repr__ (self): return f'<Task ({self.flag})>'
193 # When a task is called
194 def __call__(self, parent):
195 # First of all check if this task has been already done in this very run
196 # If so then return the stored vale
197 output = self._get_parent_output(parent)
198 if output != MISSING_VALUE_EXCEPTION: return output
199 # Process the task function arguments
200 processed_args = {}
201 # Get the task function expected arguments
202 specification = getfullargspec(self.func)
203 expected_arguments = specification.args
204 n_default_arguments = len(specification.defaults) if specification.defaults else 0
205 # Find out which arguments are optional since they have default values
206 default_arguments = set(expected_arguments[::-1][:n_default_arguments])
207 # If one of the expected arguments is the output_filename then set it here
208 output_filepath = None
209 writes_output_file = OUTPUT_FILEPATH_ARG in expected_arguments
210 if writes_output_file:
211 # The task should have a defined output file
212 if not self.output_filename:
213 raise RuntimeError(f'Task {self.flag} must have an "output_filename"')
214 # Set the output file path
215 output_filepath = parent.pathify(self.output_filename)
216 self._set_parent_output_filepath(parent, output_filepath)
217 # Add it to the processed args
218 processed_args[OUTPUT_FILEPATH_ARG] = output_filepath
219 # Remove the expected argument from the list
220 expected_arguments.remove(OUTPUT_FILEPATH_ARG)
221 # If one of the expected arguments is the output_directory then set it here
222 # We will set a new directory with the flag name of the task, in the correspoding path
223 # Note that while the task is beeing done the output directory has a different name
224 # Thus the directory is hidden and marked as incomplete
225 # The final output directory is the one without the incomplete prefix
226 writes_output_dir = OUTPUT_DIRECTORY_ARG in expected_arguments
227 incomplete_output_directory = None
228 final_output_directory = None
229 if writes_output_dir:
230 # Set the output directory path
231 incomplete_output_directory = parent.pathify(INCOMPLETE_PREFIX + self.flag)
232 final_output_directory = incomplete_output_directory.replace(INCOMPLETE_PREFIX, '')
233 # Add it to the processed args
234 processed_args[OUTPUT_DIRECTORY_ARG] = incomplete_output_directory
235 # Remove the expected argument from the list
236 expected_arguments.remove(OUTPUT_DIRECTORY_ARG)
237 # Iterate the reamining expected arguments
238 for arg in expected_arguments:
239 # First find the argument among the parent properties
240 arg_value = self.find_arg_value(arg, parent, default_arguments)
241 if arg_value == MISSING_ARGUMENT_EXCEPTION: continue
242 # Add the processed argument
243 processed_args[arg] = arg_value
244 # Check again if the task has output already
245 # It may happend that some dependencies assign output on their own
246 # e.g. charges, bonds
247 # If so then return the stored vale
248 output = self._get_parent_output(parent)
249 if output != MISSING_VALUE_EXCEPTION: return output
250 # Find if we have cached output
251 if self.use_cache:
252 output = parent.cache.retrieve(self.cache_output_key, MISSING_VALUE_EXCEPTION)
253 self._set_parent_output(parent, output)
254 # Check if this dependency is to be overwriten
255 forced_overwrite = self.flag in parent.overwritables
256 # Get the list of inputs which have changed compared to a previous run
257 # WARNING: Always get changed inputs, since this function updates the cache
258 # If had_cache is false then it means this is the first time the task is ever done
259 changed_inputs, had_cache, cache_cksums = self.get_changed_inputs(parent, processed_args)
260 any_input_changed = len(changed_inputs) > 0
261 # Update the cache inputs
262 parent.cache.update(self.cache_arg_cksums, cache_cksums)
263 # We must overwrite outputs either if inputs changed or if it was forced by the user
264 must_overwrite = forced_overwrite or any_input_changed
265 # Check if output already exists
266 # If the final directory already exists then it means the task was started in a previous run
267 existing_incomplete_output = writes_output_dir and exists(incomplete_output_directory)
268 # If the final directory already exists then it means the task was done in a previous run
269 existing_final_output = writes_output_dir and exists(final_output_directory)
270 # If the output file already exists then it also means the task was done in a previous run
271 existing_output_file = writes_output_file and exists(output_filepath)
272 # If we already have a cached output result
273 existing_output_data = output != MISSING_VALUE_EXCEPTION
274 # If we must overwrite then purge previous outputs
275 if must_overwrite:
276 if existing_incomplete_output: rmtree(incomplete_output_directory)
277 if existing_final_output: rmtree(final_output_directory)
278 if existing_output_file: remove(output_filepath)
279 if existing_output_data: parent.cache.delete(self.cache_output_key)
280 # If already existing output is not to be overwritten then check if it is already what we need
281 else:
282 # If output files/directories are expected then they must exist
283 # If output data is expected then it must be cached
284 satisfied_output = (not writes_output_dir or exists(final_output_directory)) \
285 and (not writes_output_file or exists(output_filepath)) \
286 and (output != MISSING_VALUE_EXCEPTION)
287 # If we already have the expected output then we can skip the task at all
288 if satisfied_output:
289 print(f'{GREY_HEADER}-> Task {self.flag} ({self.name}) already completed{COLOR_END}')
290 return output
291 # If we are at this point then we are missing some output so we must proceed to run the task
292 # Use the final output directory instead of the incomplete one if exists
293 # Note that we must check if it exists again since it may have been deleted since the last check
294 if writes_output_dir and exists(final_output_directory):
295 processed_args[OUTPUT_DIRECTORY_ARG] = final_output_directory
296 # Create the incomplete output directory, if necessary
297 missing_incomplete_output = writes_output_dir \
298 and not exists(incomplete_output_directory) \
299 and not exists(final_output_directory)
300 if missing_incomplete_output: mkdir(incomplete_output_directory)
301 # Finally call the function
302 print(f'{GREEN_HEADER}-> Running task {self.flag} ({self.name}){COLOR_END}')
303 # If the task is to be run again because an inputs changed then let the user know
304 if any_input_changed and had_cache and not forced_overwrite:
305 changes = ''.join([ '\n - ' + inp for inp in changed_inputs ])
306 print(f'{GREEN_HEADER} The task is run again since the following inputs changed:{changes}{COLOR_END}')
307 # Save a few internal values the task although the task is static
308 # We save it right before calling the function in case the function uses this task as input
309 self.changed_inputs = changed_inputs
310 self.cache_cksums = cache_cksums
311 # Run the actual task
312 output = self.func(**processed_args)
313 self._set_parent_output(parent, output)
314 # Update cache output unless it is marked to not save it
315 if self.use_cache: parent.cache.update(self.cache_output_key, output)
316 # Update the overwritables so this is not remade further in the same run
317 parent.overwritables.discard(self.flag)
318 # As a brief cleanup, if the output directory is empty at the end, then remove it
319 # Otheriwse, change the incomplete directory name to its final name
320 if writes_output_dir and exists(incomplete_output_directory):
321 if is_directory_empty(incomplete_output_directory): rmtree(incomplete_output_directory)
322 else: rename(incomplete_output_directory, final_output_directory)
323 # Now return the function result
324 return output
326 # Find argument values, thus running any dependency
327 def find_arg_value (self, arg : str, parent : Union['Project', 'MD'], default_arguments : set):
328 # Word 'task' is reserved for getting the task itself
329 if arg == 'task': return self
330 # Word 'self' is reserved for getting the caller Project/MD
331 if arg == 'self': return parent
332 # Check if the argument is an MD property
333 arg_value = safe_getattr(parent, arg, MISSING_ARGUMENT_EXCEPTION)
334 if arg_value != MISSING_ARGUMENT_EXCEPTION: return arg_value
335 # If the parent is an MD then it may happen the property is from the Project
336 if isinstance(parent, MD):
337 arg_value = safe_getattr(parent.project, arg, MISSING_ARGUMENT_EXCEPTION)
338 if arg_value != MISSING_ARGUMENT_EXCEPTION: return arg_value
339 # If the property is missing then search among the additional arguments
340 arg_value = self.args.get(arg, MISSING_ARGUMENT_EXCEPTION)
341 if arg_value != MISSING_ARGUMENT_EXCEPTION: return arg_value
342 # It may also happen that the argument has a default value
343 # If this is the case then we can skip it
344 if arg in default_arguments: return MISSING_ARGUMENT_EXCEPTION
345 # NEVER FORGET: Function arguments must have the same name that the Project/MD property
346 # If the argument is still missing then you programmed the function wrongly or...
347 # You may have forgotten the additional argument in the task args
348 raise RuntimeError(f'Function "{self.func.__name__}" from task "{self.flag}" expects argument "{arg}" but it is missing')
350 # Find out if inputs changed regarding the last run
351 def get_changed_inputs (self,
352 parent : Union['Project', 'MD'],
353 processed_args : dict) -> Tuple[ List[str], bool ]:
354 # Get cache argument references
355 cache_cksums = parent.cache.retrieve(self.cache_arg_cksums, MISSING_VALUE_EXCEPTION)
356 had_cache = False if cache_cksums == MISSING_VALUE_EXCEPTION else True
357 if cache_cksums == MISSING_VALUE_EXCEPTION: cache_cksums = {}
358 # Check argument by argument
359 # Keep a list with arguments which have changed
360 unmatched_arguments = []
361 for arg_name, arg_value in processed_args.items():
362 # Skip the output directory argument
363 # Changes in this argument are not actual changes
364 if arg_name == OUTPUT_DIRECTORY_ARG: continue
365 # Get the cksum from the new argument value
366 new_cksum = get_cksum_id(arg_value)
367 # Retrieve the cksum from the old argument value
368 old_cksum = cache_cksums.get(arg_name, None)
369 # Compare new and old cksums
370 if new_cksum != old_cksum:
371 # If we found a missmatch then add it to the list
372 unmatched_arguments.append(arg_name)
373 # Update the references
374 cache_cksums[arg_name] = new_cksum
375 return unmatched_arguments, had_cache, cache_cksums
377# A Molecular Dynamics (MD) is the union of a structure and a trajectory
378# Having this data several analyses are possible
379# Note that an MD is always defined inside of a Project and thus it has additional topology and metadata
380class MD:
381 def __init__ (self,
382 # The parent project this MD belongs to
383 project : 'Project',
384 # The number of the MD according to its accession
385 number : int,
386 # The local directory where the MD takes place
387 directory : str,
388 # Input structure and trajectory files
389 input_structure_filepath : str,
390 input_trajectory_filepaths : List[str],
391 ):
392 # Save the inputs
393 self.project = project
394 if not project:
395 raise Exception('Project is mandatory to instantiate a new MD')
396 # Save the MD number
397 self.number = number
398 # Set the MD accession and request URL
399 self.accession = None
400 self.remote = None
401 if self.project.database_url and self.project.accession:
402 self.accession = f'{self.project.accession}.{self.number}'
403 self.remote = Remote(self.project.database_url, self.accession)
404 # Save the directory
405 # If it is an absolute then make it relative to the project
406 if isabs(directory):
407 # This function already removes the final slash
408 self.directory = relpath(directory, self.project.directory)
409 # Otherwise save it as is but just removing the final slash (if any)
410 else:
411 self.directory = remove_final_slash(directory)
412 self.directory = self.project.pathify(self.directory)
413 # If the directory does not exists then create it
414 if not exists(self.directory):
415 mkdir(self.directory)
416 # Save the input structure filepath
417 # They may be relative to the project directory (unique) or relative to the MD directory (one per MD)
418 # If the path is absolute then it is considered unique
419 # If the file does not exist and it is to be downloaded then it is downloaded for each MD
420 # Priorize the MD directory over the project directory
421 self.input_structure_filepath = input_structure_filepath
422 # Set the internal variable for the input structure file, to be assigned later
423 self._input_structure_file = None
424 # Save the input trajectory filepaths
425 self.input_trajectory_filepaths = input_trajectory_filepaths
426 # Set the internal variable for the input trajectory files, to be assigned later
427 self._input_trajectory_files = None
429 # Processed structure and trajectory files
430 self._structure_file = None
431 self._trajectory_file = None
433 # Other values which may be found/calculated on demand
434 self._md_inputs = None
435 self._structure = None
437 # Tests
438 self._trajectory_integrity = None
440 # Set a new MD specific register
441 # In case the directory is the project directory itself, use the project register
442 register_filepath = self.pathify(REGISTER_FILENAME)
443 register_file = File(register_filepath)
444 if register_file.path == self.project.register.file.path:
445 self.register = self.project.register
446 else:
447 self.register = Register(register_file)
448 # Save also warnings apart since they are to be used as an input for metadata tasks
449 self.warnings = self.register.warnings
451 # Set a new MD specific cache
452 # In case the directory is the project directory itself, use the project cache
453 cache_filepath = self.pathify(CACHE_FILENAME)
454 cache_file = File(cache_filepath)
455 if cache_file.path == self.project.cache.file.path:
456 self.cache = self.project.cache
457 else:
458 self.cache = Cache(cache_file)
460 # Set tasks whose output is to be overwritten
461 self.overwritables = set()
463 def __repr__ (self):
464 return 'MD'
466 # Given a filename or relative path, add the MD directory path at the beginning
467 def pathify (self, filename_or_relative_path : str) -> str:
468 return self.directory + '/' + filename_or_relative_path
470 # Input structure file ------------
472 def get_input_structure_filepath (self) -> str:
473 """Set a function to get input structure file path"""
474 # Set a function to find out if a path is relative to MD directories or to the project directory
475 # To do so just check if the file exists in any of those
476 # In case it exists in both or none then assume it is relative to MD directory
477 # Parse glob notation in the process
478 def relativize_and_parse_paths (input_path : str, may_not_exist : bool = False) -> Optional[str]:
479 # Check if it is an absolute path
480 if isabs(input_path):
481 abs_glob_parse = parse_glob(input_path)
482 # If we had multiple results then we complain
483 if len(abs_glob_parse) > 1:
484 raise InputError(f'Multiple structures found with "{input_path}": {", ".join(abs_glob_parse)}')
485 # If we had no results then we complain
486 if len(abs_glob_parse) == 0:
487 if self.remote:
488 warn('Spread syntax is not supported to download remote files')
489 raise InputError(f'No structure found with "{input_path}"')
490 abs_parsed_filepath = abs_glob_parse[0]
491 return abs_parsed_filepath
492 # Check the MD directory
493 md_relative_filepath = self.pathify(input_path)
494 md_glob_parse = parse_glob(md_relative_filepath)
495 if len(md_glob_parse) > 1:
496 raise InputError(f'Multiple structures found with "{input_path}": {", ".join(md_glob_parse)}')
497 md_parsed_filepath = md_glob_parse[0] if len(md_glob_parse) == 1 else None
498 if md_parsed_filepath and File(md_parsed_filepath).exists:
499 return md_parsed_filepath
500 # Check the project directory
501 project_relative_filepath = self.project.pathify(input_path)
502 project_glob_parse = parse_glob(project_relative_filepath)
503 if len(project_glob_parse) > 1:
504 raise InputError(f'Multiple structures found with "{input_path}": {", ".join(project_glob_parse)}')
505 project_parsed_filepath = project_glob_parse[0] if len(project_glob_parse) == 1 else None
506 if project_parsed_filepath and File(project_parsed_filepath).exists:
507 return project_parsed_filepath
508 # At this point we can conclude the input structure file does not exist
509 # If we have no paths at all then it means a glob pattern was passed and it didn't match
510 # Note that if a glob pattern existed then it would mean the file actually existed
511 if len(md_glob_parse) == 0 and len(project_glob_parse) == 0:
512 # Warn the user in case it was trying to use glob syntax to donwload remote files
513 if self.remote:
514 warn('Spread syntax is not supported to download remote files')
515 raise InputError('No trajectory file was reached neither in the project directory or MD directories in path(s) ' + ', '.join(input_path))
516 # If the path does not exist anywhere then we asume it will be downloaded and set it relative to the MD
517 # However make sure we have a remote
518 # As an exception, if the 'may not exist' flag is passed then we return the result even if there is no remote
519 if not may_not_exist and not self.remote:
520 raise InputError(f'Cannot find a structure file by "{input_path}" anywhere')
521 return md_parsed_filepath
522 # If we have a value passed through command line
523 if self.input_structure_filepath:
524 # Find out if it is relative to MD directories or to the project directory
525 return relativize_and_parse_paths(self.input_structure_filepath)
526 # If we have a value passed through the inputs file has the value
527 if self.project.is_inputs_file_available():
528 # Get the input value, whose key must exist
529 inputs_value = self.project.get_input('input_structure_filepath')
530 # If there is a valid input then use it
531 if inputs_value: return relativize_and_parse_paths(inputs_value)
532 # If there is not input structure then asume it is the default
533 # Check the default structure file exists or it may be downloaded
534 default_structure_filepath = relativize_and_parse_paths(STRUCTURE_FILENAME, may_not_exist=True)
535 default_structure_file = File(default_structure_filepath)
536 # AGUS: si default_structure_filepath es None, default_structure_file será un objeto File y no se puede evaluar como None
537 # AGUS: de esta forma al evaluar directamente si default_structure_filepath es None, se evita el error
538 if default_structure_filepath is not None:
539 if default_structure_file.exists or self.remote:
540 return default_structure_filepath
541 # If there is not input structure anywhere then use the input topology
542 # We will extract the structure from it using a sample frame from the trajectory
543 # Note that topology input filepath must exist and an input error will raise otherwise
544 # However if we are using the standard topology file we can not extract the PDB from it (yet)
545 if self.project.input_topology_file.filename != STANDARD_TOPOLOGY_FILENAME:
546 return self.project.input_topology_file.path
547 # If we can not use the topology either then surrender
548 raise InputError('There is not input structure at all')
550 def get_input_structure_file (self) -> str:
551 """Get the input pdb filename from the inputs.
552 If the file is not found try to download it."""
553 # If the input structure file is already defined then return it
554 if self._input_structure_file:
555 return self._input_structure_file
556 # Otherwise we must set it
557 # First set the input structure filepath
558 input_structure_filepath = self.get_input_structure_filepath()
559 # Now set the input structure file
560 self._input_structure_file = File(input_structure_filepath)
561 # If the file already exists then return it
562 if self._input_structure_file.exists:
563 return self._input_structure_file
564 # Try to download it
565 # If we do not have the required parameters to download it then we surrender here
566 if not self.remote:
567 raise InputError(f'Missing input structure file "{self._input_structure_file.path}"')
568 # Download the structure
569 # If the structure filename is the standard structure filename then use the structure endpoint instead
570 if self._input_structure_file.filename == STRUCTURE_FILENAME:
571 self.remote.download_standard_structure(self._input_structure_file)
572 # Otherwise download the input strucutre file by its filename
573 else:
574 self.remote.download_file(self._input_structure_file)
575 return self._input_structure_file
576 input_structure_file = property(get_input_structure_file, None, None, "Input structure filename (read only)")
578 # Input trajectory filename ------------
580 def get_input_trajectory_filepaths (self) -> str:
581 """Set a function to get input trajectory file paths."""
582 # Set a function to check and fix input trajectory filepaths
583 # Also relativize paths to the current MD directory and parse glob notation
584 def relativize_and_parse_paths (input_paths : List[str]) -> List[str]:
585 checked_paths = input_paths
586 # Input trajectory filepaths may be both a list or a single string
587 # However we must keep a list
588 if type(checked_paths) == list:
589 pass
590 elif type(checked_paths) == str:
591 checked_paths = [ checked_paths ]
592 else:
593 raise InputError('Input trajectory filepaths must be a list of strings or a string')
594 # Make sure all or none of the trajectory paths are absolute
595 abs_count = sum([ isabs(path) for path in checked_paths ])
596 if not (abs_count == 0 or abs_count == len(checked_paths)):
597 raise InputError('All trajectory frames must be relative or absolute. Mixing is not supported')
598 # Set a function to glob-parse and merge all paths
599 def parse_all_glob (paths : List[str]) -> List[str]:
600 parsed_paths = []
601 for path in paths:
602 parsed_paths += parse_glob(path)
603 return parsed_paths
604 # In case trajectory paths are absolute
605 if abs_count > 0:
606 absolute_parsed_paths = parse_all_glob(checked_paths)
607 # Check we successfully defined some trajectory file
608 if len(absolute_parsed_paths) == 0:
609 # Warn the user in case it was trying to use glob syntax to donwload remote files
610 if self.remote:
611 warn('Spread syntax is not supported to download remote files')
612 raise InputError('No trajectory file was reached neither in the project directory or MD directories in path(s) ' + ', '.join(input_paths))
613 return absolute_parsed_paths
614 # If trajectory paths are not absolute then check if they are relative to the MD directory
615 # Get paths relative to the current MD directory
616 md_relative_paths = [ self.pathify(path) for path in checked_paths ]
617 # In case there are glob characters we must parse the paths
618 md_parsed_paths = parse_all_glob(md_relative_paths)
619 # Check we successfully defined some trajectory file
620 if len(md_parsed_paths) > 0:
621 # If so, check at least one of the files do actually exist
622 if any([ File(path).exists for path in md_parsed_paths ]):
623 return md_parsed_paths
624 # If no trajectory files where found then asume they are relative to the project
625 # Get paths relative to the project directory
626 project_relative_paths = [ self.project.pathify(path) for path in checked_paths ]
627 # In case there are glob characters we must parse the paths
628 project_parsed_paths = parse_all_glob(project_relative_paths)
629 # Check we successfully defined some trajectory file
630 if len(project_parsed_paths) > 0:
631 # If so, check at least one of the files do actually exist
632 if any([ File(path).exists for path in project_parsed_paths ]):
633 return project_parsed_paths
634 # At this point we can conclude the input trajectory file does not exist
635 # If we have no paths at all then it means a glob pattern was passed and it didn't match
636 # Note that if a glob pattern existed then it would mean the file actually existed
637 if len(md_parsed_paths) == 0 and len(project_parsed_paths) == 0:
638 # Warn the user in case it was trying to use glob syntax to donwload remote files
639 if self.remote:
640 warn('Spread syntax is not supported to download remote files')
641 raise InputError('No trajectory file was reached neither in the project directory or MD directories in path(s) ' + ', '.join(input_paths))
642 # If we have a path however it may be downloaded from the database if we have a remote
643 if not self.remote:
644 raise InputError(f'Cannot find anywhere a trajectory file with path(s) "{", ".join(input_paths)}"')
645 # In this case we set the path as MD relative
646 # Note that if input path was not glob based it will be both as project relative and MD relative
647 if len(md_parsed_paths) == 0: raise ValueError('This should never happen')
648 return md_parsed_paths
649 # If we have a value passed through command line
650 if self.input_trajectory_filepaths:
651 return relativize_and_parse_paths(self.input_trajectory_filepaths)
652 # Check if the inputs file has the value
653 if self.project.is_inputs_file_available():
654 # Get the input value
655 inputs_value = self.project.get_input('input_trajectory_filepaths')
656 if inputs_value:
657 return relativize_and_parse_paths(inputs_value)
658 # If no input trajectory is passed then asume it is the default
659 default_trajectory_filepath = self.pathify(TRAJECTORY_FILENAME)
660 default_trajectory_file = File(default_trajectory_filepath)
661 if default_trajectory_file.exists or self.remote:
662 return relativize_and_parse_paths([ TRAJECTORY_FILENAME ])
663 # If there is no trajectory available then we surrender
664 raise InputError('There is not input trajectory at all')
666 def get_input_trajectory_files (self) -> str:
667 """Get the input trajectory filename(s) from the inputs.
668 If file(s) are not found try to download it."""
669 # If we already defined input trajectory files then return them
670 if self._input_trajectory_files != None:
671 return self._input_trajectory_files
672 # Otherwise we must set the input trajectory files
673 input_trajectory_filepaths = self.get_input_trajectory_filepaths()
674 self._input_trajectory_files = [ File(path) for path in input_trajectory_filepaths ]
675 # Find missing trajectory files
676 missing_input_trajectory_files = []
677 for trajectory_file in self._input_trajectory_files:
678 if not trajectory_file.exists:
679 missing_input_trajectory_files.append(trajectory_file)
680 # If all files already exists then we are done
681 if len(missing_input_trajectory_files) == 0:
682 return self._input_trajectory_files
683 # Try to download the missing files
684 # If we do not have the required parameters to download it then we surrender here
685 if not self.remote:
686 missing_filepaths = [ trajectory_file.path for trajectory_file in missing_input_trajectory_files ]
687 raise InputError('Missing input trajectory files: ' + ', '.join(missing_filepaths))
688 # Download each trajectory file (ususally it will be just one)
689 for trajectory_file in self._input_trajectory_files:
690 # If this is the main trajectory (the usual one) then use the dedicated endpoint
691 if trajectory_file.filename == TRAJECTORY_FILENAME:
692 frame_selection = f'1:{self.project.sample_trajectory}:1' if self.project.sample_trajectory else None
693 self.remote.download_trajectory(trajectory_file, frame_selection=frame_selection, format='xtc')
694 # Otherwise, download it by its filename
695 else:
696 self.remote.download_file(trajectory_file)
697 return self._input_trajectory_files
698 input_trajectory_files = property(get_input_trajectory_files, None, None, "Input trajectory filenames (read only)")
700 def get_md_inputs (self) -> dict:
701 """MD specific inputs."""
702 # If we already have a value stored then return it
703 if self._md_inputs:
704 return self._md_inputs
705 # Otherwise we must find its value
706 # If we have MD inputs in the inputs file then use them
707 if self.project.input_mds:
708 # Iterate over the different MD inputs to find out each directory
709 # We must find the MD inputs whcih belong to this specific MD according to this directory
710 for md in self.project.input_mds:
711 # Get the directory according to the inputs
712 directory = md.get(MD_DIRECTORY, None)
713 if directory:
714 check_directory(directory)
715 # If no directory is specified in the inputs then guess it from the MD name
716 else:
717 name = md.get('name', None)
718 if not name: raise InputError('There is a MD with no name and no directory. Please define at least one of them.')
719 directory = name_2_directory(name)
720 # If the directory matches then this is our MD inputs
721 if directory == self.directory:
722 self._md_inputs = md
723 return self._md_inputs
724 # If this MD directory has not associated inputs then it means it was forced through command line
725 # We set a provisional MD inputs for it
726 provisional_name = directory_2_name(self.directory)
727 self._md_inputs = { 'name': provisional_name }
728 return self._md_inputs
730 md_inputs = property(get_md_inputs, None, None, "MD specific inputs (read only)")
732 # ---------------------------------
734 def get_file (self, target_file : File) -> bool:
735 """Check if a file exists. If not, try to download it from the database.
736 If the file is not found in the database it is fine, we do not even warn the user.
737 Note that this function is used to get populations and transitions files, which are not common."""
738 # If it exists we are done
739 if target_file.exists:
740 return True
741 # Try to download the missing file
742 # If we do not have the required parameters to download it then we surrender here
743 if not self.remote:
744 return False
745 # Check if the file is among the available remote files
746 # If it is no then stop here
747 if target_file.filename not in self.remote.available_files:
748 return False
749 # Download the file
750 self.remote.download_file(target_file)
751 return True
753 # Make a summary of tests and their status
754 def print_tests_summary (self):
755 print('Tests summary:')
756 for test_name in AVAILABLE_CHECKINGS:
757 test_result = self.register.tests.get(test_name, None)
758 # Print things pretty
759 test_nice_name = NICE_NAMES[test_name]
760 test_nice_result = None
761 if test_result == None:
762 test_nice_result = YELLOW_HEADER + 'Not run' + COLOR_END
763 elif test_result == False:
764 test_nice_result = RED_HEADER + 'Failed' + COLOR_END
765 elif test_result == True:
766 test_nice_result = GREEN_HEADER + 'Passed' + COLOR_END
767 elif test_result == 'na':
768 test_nice_result = BLUE_HEADER + 'Not applicable' + COLOR_END
769 else:
770 raise ValueError()
772 print(f' - {test_nice_name} -> {test_nice_result}')
774 # Issue some warnings if failed or never run tests are skipped
775 # This is run after processing input files
776 def _issue_required_test_warnings (self):
777 for test_name in AVAILABLE_CHECKINGS:
778 # If test was not skipped then proceed
779 if test_name not in self.project.trust: continue
780 # If test passed in a previous run the proceed
781 test_result = self.register.tests.get(test_name)
782 if test_result == True: continue
783 # If test failed in a previous run we can also proceed
784 # The failing warning must be among the inherited warnings, so there is no need to add more warnings here
785 if test_result == False: continue
786 # If the test has been always skipped then issue a warning
787 if test_result == None:
788 # Remove previous warnings
789 self.register.remove_warnings(test_name)
790 # Get test pretty name
791 test_nice_name = NICE_NAMES[test_name]
792 # Issue the corresponding warning
793 self.register.add_warning(test_name, test_nice_name + ' was skipped and never run before')
794 continue
795 raise ValueError('Test value is not supported')
797 # Processed files ----------------------------------------------------
799 # Run the actual processing to generate output processed files out of input raw files
800 # And by "files" I mean structure, trajectory and topology
801 process_input_files = Task('inpro', 'Process input files', process_input_files)
803 def get_structure_file (self) -> str:
804 """Get the processed structure."""
805 # If we have a stored value then return it
806 # This means we already found or generated this file
807 if self._structure_file:
808 return self._structure_file
809 # Set the file
810 structure_filepath = self.pathify(STRUCTURE_FILENAME)
811 self._structure_file = File(structure_filepath)
812 # If the faith flag was passed then simply make sure the input file makes sense
813 if self.project.faith:
814 if self.input_structure_file != self._structure_file:
815 raise InputError('Input structure file is not equal to output structure file but the "faith" flag was used.\n'
816 ' Please refrain from using the faith argument (-f) if you ignore its effect.')
817 if not self.input_structure_file.exists:
818 raise InputError('Input structure file does not exist but the "faith" flag was used.\n'
819 ' Please refrain from using the faith argument (-f) if you ignore its effect.')
820 return self._structure_file
821 # Run the processing logic
822 self.process_input_files(self)
823 # Now that the file is sure to exist we return it
824 return self._structure_file
825 structure_file = property(get_structure_file, None, None, "Structure file (read only)")
827 def get_trajectory_file (self) -> str:
828 """Get the processed trajectory."""
829 # If we have a stored value then return it
830 # This means we already found or generated this file
831 if self._trajectory_file:
832 return self._trajectory_file
833 # If the file already exists then we are done
834 trajectory_filepath = self.pathify(TRAJECTORY_FILENAME)
835 self._trajectory_file = File(trajectory_filepath)
836 # If the faith flag was passed then simply make sure the input file makes sense
837 if self.project.faith:
838 if len(self.input_trajectory_files) > 1:
839 raise InputError('Several input trajectory files but the "faith" flag was used.\n'
840 ' Please refrain from using the faith argument (-f) if you ignore its effect.')
841 sample = self.input_trajectory_files[0]
842 if sample != self._trajectory_file:
843 raise InputError('Input trajectory file is not equal to output trajectory file but the "faith" flag was used.\n'
844 ' Please refrain from using the faith argument (-f) if you ignore its effect.')
845 if not self._trajectory_file.exists:
846 raise InputError('Input trajectory file does not exist but the "faith" flag was used.\n'
847 ' Please refrain from using the faith argument (-f) if you ignore its effect.')
848 return self._trajectory_file
849 # Run the processing logic
850 self.process_input_files(self)
851 # Now that the file is sure to exist we return it
852 return self._trajectory_file
853 trajectory_file = property(get_trajectory_file, None, None, "Trajectory file (read only)")
855 # Get the processed topology from the project
856 def get_topology_file (self) -> str:
857 return self.project.get_topology_file()
858 topology_file = property(get_topology_file, None, None, "Topology filename from the project (read only)")
860 # ---------------------------------------------------------------------------------
861 # Others values which may be found/calculated and files to be generated on demand
862 # ---------------------------------------------------------------------------------
864 # Trajectory snapshots
865 get_snapshots = Task('frames', 'Count trajectory frames', get_frames_count)
866 snapshots = property(get_snapshots, None, None, "Trajectory snapshots (read only)")
868 # Safe bonds
869 def get_reference_bonds (self) -> List[List[int]]:
870 return self.project.reference_bonds
871 reference_bonds = property(get_reference_bonds, None, None, "Atom bonds to be trusted (read only)")
873 # Parsed structure
874 def get_structure (self) -> 'Structure':
875 # If we already have a stored value then return it
876 if self._structure:
877 return self._structure
878 # Otherwise we must set the structure
879 # Make sure the structure file exists at this point
880 if not self.structure_file.exists:
881 raise ValueError('Trying to set standard structure but file '
882 f'{self.structure_file.path} does not exist yet. Are you trying '
883 'to access the standard structure before processing input files?')
884 # Note that this is not only the structure class, but it also contains additional logic
885 self._structure = Structure.from_pdb_file(self.structure_file.path)
886 # If the stable bonds test failed and we had mercy then it is sure our structure will have wrong bonds
887 # In order to make it coherent with the topology we will mine topology bonds from here and force them in the structure
888 # If we fail to get bonds from topology then just go along with the default structure bonds
889 if not self.register.tests.get(STABLE_BONDS_FLAG, None):
890 self._structure.bonds = self.reference_bonds
891 # Same procedure if we have coarse grain atoms
892 elif self.cg_selection:
893 self._structure.bonds = self.reference_bonds
894 return self._structure
895 structure = property(get_structure, None, None, "Parsed structure (read only)")
897 # First frame PDB file
898 get_first_frame = Task('firstframe', 'Get first frame structure',
899 get_first_frame, output_filename = FIRST_FRAME_FILENAME)
900 get_first_frame_file = get_first_frame.get_output_file
901 first_frame_file = property(get_first_frame_file, None, None, "First frame (read only)")
903 # Average structure filename
904 get_average_structure = Task('average', 'Get average structure',
905 get_average_structure, output_filename = AVERAGE_STRUCTURE_FILENAME)
906 get_average_structure_file = get_average_structure.get_output_file
907 average_structure_file = property(get_average_structure_file, None, None, "Average structure filename (read only)")
909 # Produce the MD metadata file to be uploaded to the database
910 prepare_metadata = Task('mdmeta', 'Prepare MD metadata',
911 generate_md_metadata, output_filename=OUTPUT_METADATA_FILENAME)
913 # The processed interactions
914 get_processed_interactions = Task('inter', 'Interaccions processing',
915 process_interactions, { 'frames_limit': 1000 })
916 interactions = property(get_processed_interactions, None, None, "Processed interactions (read only)")
918 # Set a function to get input values which may be MD specific
919 # If the MD input is missing then we use the project input
920 def input_getter (name : str):
921 # Set the getter
922 def getter (self):
923 # Get the MD input
924 value = self.md_inputs.get(name, None)
925 if value != None:
926 return value
927 # If there is no MD input then return the project value
928 return getattr(self.project, f'input_{name}')
929 return getter
931 # Assign the MD input getters
932 input_interactions = property(input_getter('interactions'), None, None, "Interactions to be analyzed (read only)")
933 input_pbc_selection = property(input_getter('pbc_selection'), None, None, "Selection of atoms which are still in periodic boundary conditions (read only)")
934 input_cg_selection = property(input_getter('cg_selection'), None, None, "Selection of atoms which are not actual atoms but coarse grain beads (read only)")
936 # Internal function to set PBC selection
937 # It may parse the inputs file selection string if it is available or guess it otherwise
938 def _set_pbc_selection (self, reference_structure : 'Structure', verbose : bool = False) -> 'Selection':
939 # Otherwise we must set the PBC selection
940 if verbose: print('Setting Periodic Boundary Conditions (PBC) atoms selection')
941 selection_string = None
942 # If there is inputs file then get the input pbc selection
943 if self.project.is_inputs_file_available():
944 if verbose: print(' Using selection string in the inputs file')
945 selection_string = self.input_pbc_selection
946 # If there is no inputs file we guess PBC atoms automatically
947 else:
948 if verbose: print(' No inputs file -> Selection string will be set automatically')
949 selection_string = 'auto'
950 # Parse the selection string using the reference structure
951 parsed_selection = None
952 # If the input PBC selection is 'auto' then guess it automatically
953 if selection_string == 'auto':
954 # To guess PBC atoms (with the current implementation) we must make sure ther eis no CG
955 if reference_structure.has_cg():
956 raise InputError('We can not guess PBC atoms in CG systems. Please set PBC atoms manually.\n'
957 ' Use the "-pbc" argument or set the inputs file "pbc_selection" field.')
958 if verbose: print(' Guessing PBC atoms as solvent, counter ions and lipids')
959 parsed_selection = reference_structure.select_pbc_guess()
960 # If we have a valid input value then use it
961 elif selection_string:
962 if verbose: print(f' Selecting PBC atoms "{selection_string}"')
963 parsed_selection = reference_structure.select(selection_string)
964 if not parsed_selection:
965 raise InputError(f'PBC selection "{selection_string}" selected no atoms')
966 # If we have an input value but it is empty then we set an empty selection
967 else:
968 if verbose: print(' No PBC atoms selected')
969 parsed_selection = Selection()
970 # Log a few of the selected residue names
971 if verbose and parsed_selection:
972 print(f' Parsed PBC selection has {len(parsed_selection)} atoms')
973 selected_residues = reference_structure.get_selection_residues(parsed_selection)
974 selected_residue_names = list(set([ residue.name for residue in selected_residues ]))
975 limit = 3 # Show a maximum of 3 residue names
976 example_residue_names = ', '.join(selected_residue_names[0:limit])
977 if len(selected_residue_names) > limit: example_residue_names += ', etc.'
978 print(' e.g. ' + example_residue_names)
979 return parsed_selection
981 # Periodic boundary conditions atom selection
982 def get_pbc_selection (self) -> 'Selection':
983 # If we already have a stored value then return it
984 if self.project._pbc_selection != None:
985 return self.project._pbc_selection
986 # Otherwise we must set the PBC selection
987 self.project._pbc_selection = self._set_pbc_selection(self.structure)
988 return self.project._pbc_selection
989 pbc_selection = property(get_pbc_selection, None, None, "Periodic boundary conditions atom selection (read only)")
991 # Indices of residues in periodic boundary conditions
992 # WARNING: Do not inherit project pbc residues
993 # WARNING: It may trigger all the processing logic of the reference MD when there is no need
994 def get_pbc_residues (self) -> List[int]:
995 # If we already have a stored value then return it
996 if self.project._pbc_residues:
997 return self.project._pbc_residues
998 # If there is no inputs file then asume there are no PBC residues
999 if not self.pbc_selection:
1000 self.project._pbc_residues = []
1001 return self.project._pbc_residues
1002 # Otherwise we parse the selection and return the list of residue indices
1003 self.project._pbc_residues = self.structure.get_selection_residue_indices(self.pbc_selection)
1004 print(f'PBC residues "{self.input_pbc_selection}" -> {len(self.project._pbc_residues)} residues')
1005 return self.project._pbc_residues
1006 pbc_residues = property(get_pbc_residues, None, None, "Indices of residues in periodic boundary conditions (read only)")
1008 # Set the coare grain selection
1009 # DANI: Esto algún día habría que tratar de automatizarlo
1010 def _set_cg_selection (self, reference_structure : 'Structure', verbose : bool = False) -> 'Selection':
1011 if verbose: print('Setting Coarse Grained (CG) atoms selection')
1012 # If there is no inputs file then asum there is no CG selection
1013 if not self.project.is_inputs_file_available():
1014 if verbose: print(' No inputs file -> Asuming there is no CG at all')
1015 return Selection()
1016 # Otherwise we use the selection string from the inputs
1017 if verbose: print(' Using selection string in the inputs file')
1018 selection_string = self.input_cg_selection
1019 # If the selection is empty, again, assume there is no CG selection
1020 if not selection_string:
1021 if verbose: print(' Empty selection -> There is no CG at all')
1022 return Selection()
1023 # Otherwise, process it
1024 # If we have a valid input value then use it
1025 elif selection_string:
1026 if verbose: print(f' Selecting CG atoms "{selection_string}"')
1027 parsed_selection = reference_structure.select(selection_string)
1028 # If we have an input value but it is empty then we set an empty selection
1029 else:
1030 if verbose: print(' No CG atoms selected')
1031 parsed_selection = Selection()
1032 # Lof the parsed selection size
1033 if verbose: print(f' Parsed CG selection has {len(parsed_selection)} atoms')
1034 # Log a few of the selected residue names
1035 if verbose and parsed_selection:
1036 selected_residues = reference_structure.get_selection_residues(parsed_selection)
1037 selected_residue_names = list(set([ residue.name for residue in selected_residues ]))
1038 limit = 3 # Show a maximum of 3 residue names
1039 example_residue_names = ', '.join(selected_residue_names[0:limit])
1040 if len(selected_residue_names) > limit: example_residue_names += ', etc.'
1041 print(' e.g. ' + example_residue_names)
1042 return parsed_selection
1044 # Coarse grain atom selection
1045 def get_cg_selection (self) -> 'Selection':
1046 # If we already have a stored value then return it
1047 if self.project._cg_selection:
1048 return self.project._cg_selection
1049 # Otherwise we must set the PBC selection
1050 self.project._cg_selection = self._set_cg_selection(self.structure)
1051 return self.project._cg_selection
1052 cg_selection = property(get_cg_selection, None, None, "Periodic boundary conditions atom selection (read only)")
1054 # Indices of residues in coarse grain
1055 # WARNING: Do not inherit project cg residues
1056 # WARNING: It may trigger all the processing logic of the reference MD when there is no need
1057 def get_cg_residues (self) -> List[int]:
1058 # If we already have a stored value then return it
1059 if self.project._cg_residues:
1060 return self.project._cg_residues
1061 # If there is no inputs file then asume there are no cg residues
1062 if not self.cg_selection:
1063 self.project._cg_residues = []
1064 return self.project._cg_residues
1065 # Otherwise we parse the selection and return the list of residue indices
1066 self.project._cg_residues = self.structure.get_selection_residue_indices(self.cg_selection)
1067 print(f'CG residues "{self.input_cg_selection}" -> {len(self.project._cg_residues)} residues')
1068 return self.project._cg_residues
1069 cg_residues = property(get_cg_residues, None, None, "Indices of residues in coarse grain (read only)")
1071 # Equilibrium populations from a MSM
1072 # Inherited from project
1073 def get_populations (self) -> List[float]:
1074 return self.project.populations
1075 populations = property(get_populations, None, None, "Equilibrium populations from a MSM (read only)")
1077 # Transition probabilities from a MSM
1078 # Inherited from project
1079 def get_transitions (self) -> List[List[float]]:
1080 return self.project.transitions
1081 transitions = property(get_transitions, None, None, "Transition probabilities from a MSM (read only)")
1083 # Residues mapping
1084 # Inherited from project
1085 def get_protein_map (self) -> dict:
1086 return self.project.protein_map
1087 protein_map = property(get_protein_map, None, None, "Residues mapping (read only)")
1089 # Reference frame
1090 get_reference_frame = Task('reframe', 'Reference frame', get_bonds_canonical_frame)
1091 reference_frame = property(get_reference_frame, None, None, "Reference frame to be used to represent the MD (read only)")
1093 # ---------------------------------------------------------------------------------
1094 # Tests
1095 # ---------------------------------------------------------------------------------
1097 # Sudden jumps test
1098 def is_trajectory_integral (self) -> Optional[bool]:
1099 # If we already have a stored value then return it
1100 if self._trajectory_integrity != None:
1101 return self._trajectory_integrity
1102 # Otherwise we must find the value
1103 self._trajectory_integrity = check_trajectory_integrity(
1104 input_structure_filename = self.structure_file.path,
1105 input_trajectory_filename = self.trajectory_file.path,
1106 structure = self.structure,
1107 pbc_selection = self.pbc_selection,
1108 mercy = self.project.mercy,
1109 trust = self.project.trust,
1110 register = self.register,
1111 # time_length = self.time_length,
1112 check_selection = ALL_ATOMS,
1113 standard_deviations_cutoff = self.project.rmsd_cutoff,
1114 snapshots = self.snapshots,
1115 )
1116 return self._trajectory_integrity
1118 # ---------------------------------------------------------------------------------
1119 # Analyses
1120 # ---------------------------------------------------------------------------------
1122 # RMSDs analysis
1123 run_rmsds_analysis = Task('rmsds', 'RMSDs analysis',
1124 rmsds, { 'frames_limit': 5000 })
1126 # TM scores analysis
1127 run_tmscores_analysis = Task('tmscore', 'TM scores analysis',
1128 tmscores, { 'frames_limit': 200 })
1130 # RMSF, atom fluctuation analysis
1131 run_rmsf_analysis = Task('rmsf', 'Fluctuation analysis', rmsf)
1133 # Radius of gyration analysis
1134 run_rgyr_analysis = Task('rgyr', 'Radius of gyration analysis',
1135 rgyr, { 'frames_limit': 5000 })
1137 # PCA, principal component analysis
1138 run_pca_analysis = Task('pca', 'Principal component analysis',
1139 pca, { 'frames_limit': 2000, 'projection_frames': 20 })
1141 # PCA contacts
1142 # DANI: Intenta usar mucha memoria, hay que revisar
1143 # DANI: Puede saltar un error de imposible alojar tanta memoria
1144 # DANI: Puede comerse toda la ram y que al final salte un error de 'Terminado (killed)'
1145 # DANI: De momento me lo salto
1146 # DANI: Lleva mucho tiempo sin mantenerse, habrá que cambiar varias cosas al recuperarlo
1147 # run_pca_contacts('pcacons', 'PCA contacts', pca_contacts)
1149 # RMSD per residue analysis
1150 run_rmsd_perres_analysis = Task('perres', 'RMSD per residue analysis',
1151 rmsd_per_residue, { 'frames_limit': 100 })
1153 # RMSD pairwise
1154 # Perform an analysis for the overall structure and then one more analysis for each interaction
1155 run_rmsd_pairwise_analysis = Task('pairwise', 'RMSD pairwise',
1156 rmsd_pairwise, { 'frames_limit': 200, 'overall_selection': "name CA or name C5'" })
1158 # Run the cluster analysis
1159 run_clusters_analysis = Task('clusters', 'Clusters analysis',
1160 clusters_analysis, { 'frames_limit': 1000, 'desired_n_clusters': 20 })
1162 # Calculate the distance mean and standard deviation of each pair of residues
1163 run_dist_perres_analysis = Task('dist', 'Distance per residue',
1164 distance_per_residue, { 'frames_limit': 200 })
1166 # Hydrogen bonds
1167 run_hbonds_analysis = Task('hbonds', 'Hydrogen bonds analysis',
1168 hydrogen_bonds, { 'time_splits': 100 })
1170 # SASA, solvent accessible surface analysis
1171 run_sas_analysis = Task('sas', 'Solvent accessible surface analysis',
1172 sasa, { 'frames_limit': 100 })
1174 # Perform the electrostatic and vdw energies analysis for each pair of interaction agents
1175 run_energies_analysis = Task('energies', 'Energies analysis',
1176 energies, { 'frames_limit': 100 })
1178 # Calculate torsions and then dihedral energies for every dihedral along the trajectory
1179 run_dihedral_energies = Task('dihedrals', 'Dihedral energies analysis',
1180 compute_dihedral_energies, { 'frames_limit': 100 })
1182 # Perform the pockets analysis
1183 run_pockets_analysis = Task('pockets', 'Pockets analysis',
1184 pockets, { 'frames_limit': 100, 'maximum_pockets_number': 10 })
1186 # Helical parameters
1187 run_helical_analysis = Task('helical', 'Helical parameters', helical_parameters)
1189 # Markov
1190 run_markov_analysis = Task('markov', 'Markov', markov, { 'rmsd_selection': PROTEIN_AND_NUCLEIC })
1192 # Membrane density analysis
1193 run_density_analysis = Task('density', 'Membrane density analysis',
1194 density, { 'frames_limit': 1000 })
1196 # Membrane thickness analysis
1197 run_thickness_analysis = Task('thickness', 'Membrane thickness analysis',
1198 thickness, { 'frames_limit': 100 })
1200 # Area per lipid analysis
1201 run_apl_analysis = Task('apl', 'Membrane area per lipid analysis', area_per_lipid)
1203 # Calculate lipid order parameters for membranes
1204 run_lipid_order_analysis = Task('lorder', 'Membrane lipid order analysis',
1205 lipid_order, { 'frames_limit': 100 })
1207 # Lipid-protein interactions analysis
1208 run_lipid_interactions_analysis = Task('linter', 'Membrane lipid-protein interactions analysis',
1209 lipid_interactions, { 'frames_limit': 100 })
1211# The project is the main project
1212# A project is a set of related MDs
1213# These MDs share all or most topology and metadata
1214class Project:
1215 def __init__ (self,
1216 # The local directory where the project takes place
1217 directory : str = '.',
1218 # Accession of the project in the database, given that this project is already uploaded
1219 accession : Optional[str] = None,
1220 # URL to query for missing files when an accession is provided
1221 database_url : str = DEFAULT_API_URL,
1222 # A file containing a lof of inputs related to metadata, MD simulation parameters and analysis configurations
1223 inputs_filepath : str = None,
1224 # The input topology filename
1225 # Multiple formats are accepted but the default is our own parsed json topology
1226 input_topology_filepath : Optional[str] = None,
1227 # Input structure filepath
1228 # It may be both relative to the project directory or to every MD directory
1229 input_structure_filepath : Optional[str] = None,
1230 # Input trajectory filepaths
1231 # These files are searched in every MD directory so the path MUST be relative
1232 input_trajectory_filepaths : Optional[str] = None,
1233 # Set the different MD directories to be run
1234 # Each MD directory must contain a trajectory and may contain a structure
1235 md_directories : Optional[List[str]] = None,
1236 # Set an alternative MD configuration input
1237 md_config : Optional[list] = None,
1238 # Reference MD directory
1239 # Project functions which require structure or trajectory will use the ones from the reference MD
1240 # If no reference is passed then the first directory is used
1241 reference_md_index : Optional[int] = None,
1242 # Input populations and transitions (MSM only)
1243 populations_filepath : str = DEFAULT_POPULATIONS_FILENAME,
1244 transitions_filepath : str = DEFAULT_TRANSITIONS_FILENAME,
1245 # Processing and analysis instructions
1246 filter_selection : Union[bool, str] = False,
1247 pbc_selection : Optional[str] = None,
1248 cg_selection : Optional[str] = None,
1249 image : bool = False,
1250 fit : bool = False,
1251 translation : List[float] = [0, 0, 0],
1252 mercy : Union[ List[str], bool ] = [],
1253 trust : Union[ List[str], bool ] = [],
1254 faith : bool = False,
1255 pca_analysis_selection : str = PROTEIN_AND_NUCLEIC_BACKBONE,
1256 pca_fit_selection : str = PROTEIN_AND_NUCLEIC_BACKBONE,
1257 rmsd_cutoff : float = DEFAULT_RMSD_CUTOFF,
1258 interaction_cutoff : float = DEFAULT_INTERACTION_CUTOFF,
1259 interactions_auto : Optional[str] = None,
1260 # Set it we must download just a few frames instead of the whole trajectory
1261 sample_trajectory : Optional[int] = None,
1262 ):
1263 # Save input parameters
1264 self.directory = remove_final_slash(directory)
1265 self.database_url = database_url
1266 self.accession = accession
1267 # Set the project URL in case we have the required data
1268 self.remote = None
1269 if self.database_url and self.accession:
1270 self.remote = Remote(self.database_url, self.accession)
1272 # Set the inputs file
1273 # Set the expected default name in case there is no inputs file since it may be downloaded
1274 self._inputs_file = File(self.pathify(DEFAULT_INPUTS_FILENAME))
1275 # If there is an input filepath then use it
1276 if inputs_filepath:
1277 self._inputs_file = File(inputs_filepath)
1278 # Otherwise guess the inputs file using the accepted filenames
1279 else:
1280 for filename in ACCEPTED_INPUT_FILENAMES:
1281 inputs_file = File(filename)
1282 if inputs_file.exists:
1283 self._inputs_file = inputs_file
1284 break
1285 # Set the input topology file
1286 # Note that even if the input topology path is passed we do not check it exists
1287 # Never forget we can donwload some input files from the database on the fly
1288 self.input_topology_filepath = input_topology_filepath
1289 self._input_topology_file = None
1290 # Input structure and trajectory filepaths
1291 # Do not parse them to files yet, let this to the MD class
1292 self.input_structure_filepath = input_structure_filepath
1293 self.input_trajectory_filepaths = input_trajectory_filepaths
1295 # Make sure the new MD configuration (-md) was not passed as well as old MD inputs (-mdir, -stru, -traj)
1296 if md_config and (md_directories or input_trajectory_filepaths):
1297 raise InputError('MD configurations (-md) is not compatible with old MD inputs (-mdir, -traj)')
1298 # Save the MD configurations
1299 self.md_config = md_config
1300 # Make sure MD configuration has the correct format
1301 if self.md_config:
1302 # Make sure all MD configurations have at least 3 values each
1303 for mdc in self.md_config:
1304 if len(mdc) < 2:
1305 raise InputError('Wrong MD configuration: the patter is -md <directory> <trajectory> <trajectory 2> ...')
1306 # Make sure there are no duplictaed MD directories
1307 md_directories = [ mdc[0] for mdc in self.md_config ]
1308 if len(md_directories) > len(set(md_directories)):
1309 raise InputError('There are duplicated MD directories')
1311 # Input populations and transitions for MSM
1312 self.populations_filepath = populations_filepath
1313 self._populations_file = File(self.populations_filepath)
1314 self.transitions_filepath = transitions_filepath
1315 self._transitions_file = File(self.transitions_filepath)
1317 # Set the processed topology filepath, which depends on the input topology filename
1318 # Note that this file is different from the standard topology, although it may be standard as well
1319 self._topology_filepath = None
1320 self._topology_file = None
1322 # Set the standard topology file
1323 self._standard_topology_file = None
1325 # Set the MD directories
1326 self._md_directories = md_directories
1327 # Check input MDs are correct to far
1328 if self._md_directories:
1329 self.check_md_directories()
1331 # Set the reference MD
1332 self._reference_md = None
1333 self._reference_md_index = reference_md_index
1335 # Set the rest of inputs
1336 # Note that the filter selection variable is not handled here at all
1337 # This is just pased to the filtering function which knows how to handle the default
1338 self.filter_selection = filter_selection
1339 # PBC selection may come from the console or from the inputs
1340 self._input_pbc_selection = pbc_selection
1341 self._input_cg_selection = cg_selection
1342 self.image = image
1343 self.fit = fit
1344 self.translation = translation
1345 self.mercy = mercy
1346 # Fix the mercy input, if needed
1347 # If a boolean is passed instead of a list then we set its corresponding value
1348 if type(mercy) == bool:
1349 if mercy:
1350 self.mercy = AVAILABLE_FAILURES
1351 else:
1352 self.mercy = []
1353 self.trust = trust
1354 # Fix the trust input, if needed
1355 # If a boolean is passed instead of a list then we set its corresponding value
1356 if type(trust) == bool:
1357 if trust:
1358 self.trust = AVAILABLE_CHECKINGS
1359 else:
1360 self.trust = []
1361 self.faith = faith
1362 self.pca_analysis_selection = pca_analysis_selection
1363 self.pca_fit_selection = pca_fit_selection
1364 self.rmsd_cutoff = rmsd_cutoff
1365 self.interaction_cutoff = interaction_cutoff
1366 self.sample_trajectory = sample_trajectory
1367 self.interactions_auto = interactions_auto
1368 # Set the inputs, where values from the inputs file will be stored
1369 self._inputs = None
1371 # Other values which may be found/calculated on demand
1372 self._pbc_selection = None
1373 self._pbc_residues = None
1374 self._cg_selection = None
1375 self._cg_residues = None
1376 self._reference_bonds = None
1377 self._topology_reader = None
1378 self._dihedrals = None
1379 self._populations = None
1380 self._transitions = None
1381 self._pdb_ids = None
1382 self._mds = None
1384 # Force a couple of extraordinary files which is generated if atoms are resorted
1385 self.resorted_bonds_file = File(self.pathify(RESORTED_BONDS_FILENAME))
1386 self.resorted_charges_file = File(self.pathify(RESORTED_CHARGES_FILENAME))
1388 # Set a new entry for the register
1389 # This is useful to track previous workflow runs and problems
1390 register_filepath = self.pathify(REGISTER_FILENAME)
1391 register_file = File(register_filepath)
1392 self.register = Register(register_file)
1393 # Save also warnings apart since they are to be used as an input for metadata tasks
1394 self.warnings = self.register.warnings
1396 # Set the cache
1397 cache_filepath = self.pathify(CACHE_FILENAME)
1398 cache_file = File(cache_filepath)
1399 self.cache = Cache(cache_file)
1401 # Set tasks whose output is to be overwritten
1402 self.overwritables = set()
1404 def __repr__ (self):
1405 return 'Project'
1407 # Given a filename or relative path, add the project directory path at the beginning
1408 def pathify (self, filename_or_relative_path : str) -> str:
1409 return self.directory + '/' + filename_or_relative_path
1411 # Check MD directories to be right
1412 # If there is any problem then directly raise an input error
1413 def check_md_directories (self):
1414 # Check there is at least one MD
1415 if len(self._md_directories) < 1:
1416 raise InputError('There must be at least one MD')
1417 # Check there are not duplicated MD directories
1418 if len(set(self._md_directories)) != len(self._md_directories):
1419 raise InputError('There are duplicated MD directories')
1421 # Set a function to get MD directories
1422 def get_md_directories (self) -> list:
1423 # If MD directories are already declared then return them
1424 if self._md_directories:
1425 return self._md_directories
1426 # Otherwise use the default MDs
1427 self._md_directories = []
1428 # Use the MDs from the inputs file when available
1429 if self.is_inputs_file_available() and self.input_mds:
1430 for input_md in self.input_mds:
1431 # Get the directory according to the inputs
1432 directory = input_md.get(MD_DIRECTORY, None)
1433 if directory:
1434 check_directory(directory)
1435 # If no directory is specified in the inputs then guess it from the MD name
1436 else:
1437 name = input_md['name']
1438 if not name:
1439 name = 'unnamed'
1440 directory = name_2_directory(name)
1441 self._md_directories.append(directory)
1442 # Otherwise, guess MD directories by checking which directories include a register file
1443 else:
1444 available_directories = sorted(next(walk(self.directory))[1])
1445 for directory in available_directories:
1446 if exists(directory + '/' + REGISTER_FILENAME):
1447 self._md_directories.append(directory)
1448 # If we found no MD directory then it means MDs were never declared before
1449 if len(self._md_directories) == 0:
1450 raise InputError('Impossible to know which are the MD directories. '
1451 'You can either declare them using the "-mdir" option or by providing an inputs file')
1452 self.check_md_directories()
1453 return self._md_directories
1454 md_directories = property(get_md_directories, None, None, "MD directories (read only)")
1456 # Set the reference MD index
1457 def get_reference_md_index (self) -> int:
1458 # If we are already have a value then return it
1459 if self._reference_md_index:
1460 return self._reference_md_index
1461 # Otherwise we must find the reference MD index
1462 # If the inputs file is available then it must declare the reference MD index
1463 if self.is_inputs_file_available():
1464 self._reference_md_index = self.get_input('mdref')
1465 # Otherwise we simply set the first MD as the reference and warn the user about this
1466 if self._reference_md_index == None:
1467 warn('No reference MD was specified. The first MD will be used as reference.')
1468 self._reference_md_index = 0
1469 return self._reference_md_index
1470 reference_md_index = property(get_reference_md_index, None, None, "Reference MD index (read only)")
1472 # Set the reference MD
1473 def get_reference_md (self) -> int:
1474 # If we are already have a value then return it
1475 if self._reference_md:
1476 return self._reference_md
1477 # Otherwise we must find the reference MD
1478 self._reference_md = self.mds[self.reference_md_index]
1479 return self._reference_md
1480 reference_md = property(get_reference_md, None, None, "Reference MD (read only)")
1482 # Setup the MDs
1483 def get_mds (self) -> list:
1484 # If MDs are already declared then return them
1485 if self._mds:
1486 return self._mds
1487 # Now instantiate a new MD for each declared MD and save the reference MD
1488 self._mds = []
1489 # New system with MD configurations (-md)
1490 if self.md_config:
1491 for n, config in enumerate(self.md_config, 1):
1492 directory = config[0]
1493 # LEGACY
1494 # In a previous version, the md config argument also holded the structure
1495 # This was the second argument, so we check if we have more than 2 arguments
1496 # If this is the case, then check if the second argument has different format
1497 # Note that PDB format is also a trajectory supported format
1498 has_structure = False
1499 if len(config) > 2:
1500 first_sample = File(config[1])
1501 second_sample = File(config[2])
1502 if first_sample.format != second_sample.format:
1503 has_structure = True
1504 # Finally set the input structure and trajectories
1505 input_structure_filepath = config[1] if has_structure else self.input_structure_filepath
1506 input_trajectory_filepaths = config[2:] if has_structure else config[1:]
1507 # Define the MD
1508 md = MD(
1509 project = self, number = n, directory = directory,
1510 input_structure_filepath = input_structure_filepath,
1511 input_trajectory_filepaths = input_trajectory_filepaths,
1512 )
1513 self._mds.append(md)
1514 # Old system (-mdir, -stru -traj)
1515 else:
1516 for n, md_directory in enumerate(self.md_directories, 1):
1517 md = MD(
1518 project = self, number = n, directory = md_directory,
1519 input_structure_filepath = self.input_structure_filepath,
1520 input_trajectory_filepaths = self.input_trajectory_filepaths,
1521 )
1522 self._mds.append(md)
1523 return self._mds
1524 mds = property(get_mds, None, None, "Available MDs (read only)")
1526 # Check input files exist when their filenames are read
1527 # If they do not exist then try to download them
1528 # If the download is not possible then raise an error
1530 # Inputs filename ------------
1532 def is_inputs_file_available (self) -> bool:
1533 """Set a function to check if inputs file is available.
1534 Note that asking for it when it is not available will lead to raising an input error."""
1535 # If name is not declared then it is impossible to reach it
1536 if not self._inputs_file:
1537 return False
1538 # If the file already exists then it is available
1539 if self._inputs_file.exists:
1540 return True
1541 # If it does not exist but it may be downloaded then it is available
1542 if self.remote:
1543 return True
1544 return False
1546 def get_inputs_file (self) -> File:
1547 """Set a function to load the inputs file."""
1548 # There must be an inputs filename
1549 if not self._inputs_file:
1550 raise InputError('Not defined inputs filename')
1551 # If the file already exists then we are done
1552 if self._inputs_file.exists:
1553 return self._inputs_file
1554 # Try to download it
1555 # If we do not have the required parameters to download it then we surrender here
1556 if not self.remote:
1557 raise InputError(f'Missing inputs file "{self._inputs_file.filename}"')
1558 # Download the inputs json file if it does not exists
1559 self.remote.download_inputs_file(self._inputs_file)
1560 return self._inputs_file
1561 inputs_file = property(get_inputs_file, None, None, "Inputs filename (read only)")
1563 # Topology filename ------------
1566 def guess_input_topology_filepath (self) -> Optional[str]:
1567 """If there is not input topology filepath, we try to guess it among the files in the project directory.
1568 Note that if we can download from the remote then we must check the remote available files as well."""
1569 # Find the first supported topology file according to its name and format
1570 def find_first_accepted_topology_filename (available_filenames : List[str]) -> Optional[str]:
1571 for filename in available_filenames:
1572 # Make sure it is a valid topology file candidate
1573 # i.e. topology.xxx
1574 filename_splits = filename.split('.')
1575 if len(filename_splits) != 2 or filename_splits[0] != 'topology':
1576 continue
1577 # Then make sure its format is among the accepted topology formats
1578 extension = filename_splits[1]
1579 format = EXTENSION_FORMATS[extension]
1580 if format in ACCEPTED_TOPOLOGY_FORMATS:
1581 return filename
1582 return None
1583 # First check among the local available files
1584 local_files = list_files(self.directory)
1585 accepted_topology_filename = find_first_accepted_topology_filename(local_files)
1586 if accepted_topology_filename:
1587 return self.pathify(accepted_topology_filename)
1588 # In case we did not find a topology among the local files, repeat the process with the remote files
1589 if self.remote:
1590 remote_files = self.remote.available_files
1591 accepted_topology_filename = find_first_accepted_topology_filename(remote_files)
1592 if accepted_topology_filename:
1593 return self.pathify(accepted_topology_filename)
1594 # If no actual topology is to be found then try with the standard topology instead
1595 # Check if the standard topology file is available
1596 # Note that we do not use standard_topology_file property to avoid generating it at this point
1597 standard_topology_filepath = self.pathify(STANDARD_TOPOLOGY_FILENAME)
1598 standard_topology_file = File(standard_topology_filepath)
1599 if standard_topology_file.exists:
1600 return standard_topology_filepath
1601 # If not we may also try to download the standard topology
1602 if self.remote:
1603 self.remote.download_standard_topology(standard_topology_file)
1604 return standard_topology_filepath
1605 # DEPRECATED: Find if the raw charges file is present as a last resource
1606 if exists(RAW_CHARGES_FILENAME):
1607 return RAW_CHARGES_FILENAME
1608 # If we did not find any valid topology filepath at this point then return None
1609 return None
1612 def get_input_topology_filepath (self) -> Optional[str]:
1613 """Get the input topology filepath from the inputs or try to guess it.
1614 If the input topology filepath is a 'no' flag then we consider there is no topology at all
1615 So far we extract atom charges and atom bonds from the topology file
1616 In this scenario we can keep working but there are some consecuences:
1617 1 - Analysis using atom charges such as 'energies' will be skipped
1618 2 - The standard topology file will not include atom charges
1619 3 - Bonds will be guessed"""
1620 if type(self.input_topology_filepath) == str and self.input_topology_filepath.lower() in { 'no', 'not', 'na' }:
1621 return MISSING_TOPOLOGY
1622 # Set a function to parse possible glob notation
1623 def parse (filepath : str) -> str:
1624 # If there is no glob pattern then just return the string as is
1625 if not is_glob(filepath):
1626 return filepath
1627 # If there is glob pattern then parse it
1628 parsed_filepaths = glob(filepath)
1629 if len(parsed_filepaths) == 0:
1630 # Warn the user in case it was trying to use glob syntax to donwload remote files
1631 if self.remote:
1632 warn('Spread syntax is not supported to download remote files')
1633 raise InputError(f'No topologies found with "{filepath}"')
1634 if len(parsed_filepaths) > 1:
1635 raise InputError(f'Multiple topologies found with "{filepath}": {", ".join(parsed_filepaths)}')
1636 return parsed_filepaths[0]
1637 # If this value was passed through command line then it would be set as the internal value already
1638 if self.input_topology_filepath:
1639 return parse(self.input_topology_filepath)
1640 # Check if the inputs file has the value
1641 if self.is_inputs_file_available():
1642 # Get the input value, whose key must exist
1643 inputs_value = self.get_input('input_topology_filepath')
1644 # If there is a valid input then use it
1645 if inputs_value:
1646 return parse(inputs_value)
1647 # Otherwise we must guess which is the topology file
1648 guess = self.guess_input_topology_filepath()
1649 if guess:
1650 return guess
1651 # If nothing worked then surrender
1652 raise InputError('Missing input topology file path. Please provide a topology file using the "-top" argument.\n' +
1653 ' Note that you may run the workflow without a topology file. To do so, use the "-top no" argument.\n' +
1654 ' However this has implications since we usually mine atom charges and bonds from the topology file.\n' +
1655 ' Some analyses such us the interaction energies will be skiped')
1657 def get_input_topology_file (self) -> Optional[File]:
1658 """Get the input topology file.
1659 If the file is not found try to download it."""
1660 # If we already have a value then return it
1661 if self._input_topology_file != None:
1662 return self._input_topology_file
1663 # Set the input topology filepath
1664 input_topology_filepath = self.get_input_topology_filepath()
1665 # If the input filepath is None then it menas we must proceed without a topology
1666 if input_topology_filepath == MISSING_TOPOLOGY:
1667 self._input_topology_file = MISSING_TOPOLOGY
1668 return self._input_topology_file
1669 # If no input is passed then we check the inputs file
1670 # Set the file
1671 self._input_topology_file = File(input_topology_filepath)
1672 # If the file already exists then we are done
1673 if self._input_topology_file.exists:
1674 return self._input_topology_file
1675 # Try to download it
1676 # If we do not have the required parameters to download it then we surrender here
1677 if not self.remote:
1678 raise InputError(f'Missing input topology file "{self._input_topology_file.filename}"')
1679 # Otherwise, try to download it using the files endpoint
1680 # Note that this is not usually required
1681 self.remote.download_file(self._input_topology_file)
1682 # In case the topology is a '.top' file we consider it is a Gromacs topology
1683 # It may come with additional itp files we must download as well
1684 if self._input_topology_file.format == 'top':
1685 # Find available .itp files and download each of them
1686 itp_filenames = [filename for filename in self.remote.available_files if filename[-4:] == '.itp']
1687 for itp_filename in itp_filenames:
1688 itp_filepath = self.pathify(itp_filename)
1689 itp_file = File(itp_filepath)
1690 self.remote.download_file(itp_file)
1691 return self._input_topology_file
1692 input_topology_file = property(get_input_topology_file, None, None, "Input topology file (read only)")
1694 # Input structure filename ------------
1695 def get_input_structure_file (self) -> File:
1696 """Get the input structure filename."""
1697 # When calling this function make sure all MDs have the file or try to download it
1698 return self.reference_md._input_structure_file
1699 input_structure_file = property(get_input_structure_file, None, None, "Input structure filename for each MD (read only)")
1701 # Input trajectory filename ------------
1703 def get_input_trajectory_files (self) -> List[File]:
1704 """Get the input trajectory filename(s) from the inputs.
1705 If file(s) are not found try to download it."""
1706 return self.reference_md._input_trajectory_files
1707 input_trajectory_files = property(get_input_trajectory_files, None, None, "Input trajectory filenames for each MD (read only)")
1709 # Populations filename ------------
1711 def get_populations_file (self) -> File:
1712 """Get the MSM equilibrium populations filename."""
1713 if not self.get_file(self._populations_file):
1714 return None
1715 return self._populations_file
1716 populations_file = property(get_populations_file, None, None, "MSM equilibrium populations filename (read only)")
1718 # Transitions filename ------------
1720 def get_transitions_file (self) -> Optional[str]:
1721 """Get the MSM transition probabilities filename."""
1722 if not self.get_file(self._transitions_file):
1723 return None
1724 return self._transitions_file
1725 transitions_file = property(get_transitions_file, None, None, "MSM transition probabilities filename (read only)")
1727 # ---------------------------------
1729 # Check if a file exists
1730 # If not, try to download it from the database
1731 # If the file is not found in the database it is fine, we do not even warn the user
1732 # Note that nowadays this function is used to get populations and transitions files, which are not common
1733 def get_file (self, target_file : File) -> bool:
1734 return self.reference_md.get_file(target_file)
1736 # Input file values -----------------------------------------
1738 # First of all set input themselves
1740 # Get inputs
1741 def get_inputs (self) -> dict:
1742 # If inputs are already loaded then return them
1743 if self._inputs:
1744 return self._inputs
1745 # Otherwise, load inputs from the inputs file
1746 inputs_data = None
1747 if self.inputs_file.format == 'json':
1748 inputs_data = load_json(self.inputs_file.path)
1749 elif self.inputs_file.format == 'yaml':
1750 inputs_data = load_yaml(self.inputs_file.path)
1751 else:
1752 raise InputError('Input file format is not supported. Please use json or yaml files.')
1753 if not inputs_data:
1754 raise InputError('Input file is empty')
1755 self._inputs = inputs_data
1756 # Legacy fixes
1757 old_pdb_ids = self._inputs.get('pdbIds', None)
1758 if old_pdb_ids:
1759 self._inputs['pdb_ids'] = old_pdb_ids
1760 # Finally return the updated inputs
1761 return self._inputs
1762 inputs = property(get_inputs, None, None, "Inputs from the inputs file (read only)")
1764 # Then set getters for every value in the inputs file
1766 # Get a specific 'input' value
1767 # Handle a possible missing keys
1768 def get_input (self, name: str):
1769 value = self.inputs.get(name, MISSING_INPUT_EXCEPTION)
1770 # If we had a value then return it
1771 if value != MISSING_INPUT_EXCEPTION:
1772 return value
1773 # If the field is not specified in the inputs file then set a defualt value
1774 default_value = DEFAULT_INPUT_VALUES.get(name, None)
1775 # Warn the user about this
1776 warn(f'Missing input "{name}" -> Using default value: {default_value}')
1777 return default_value
1779 # Set a function to get a specific 'input' value by its key/name
1780 # Note that we return the getter function but we do not call it just yet
1781 def input_getter (name : str):
1782 def getter (self):
1783 return self.get_input(name)
1784 return getter
1786 # Assign the getters
1787 input_interactions = property(input_getter('interactions'), None, None, "Interactions to be analyzed (read only)")
1788 input_protein_references = property(input_getter('forced_references'), None, None, "Uniprot IDs to be used first when aligning protein sequences (read only)")
1789 input_pdb_ids = property(input_getter('pdb_ids'), None, None, "Protein Data Bank IDs used for the setup of the system (read only)")
1790 input_type = property(input_getter('type'), None, None, "Set if its a trajectory or an ensemble (read only)")
1791 input_mds = property(input_getter('mds'), None, None, "Input MDs configuration (read only)")
1792 input_ligands = property(input_getter('ligands'), None, None, "Input ligand references (read only)")
1793 input_force_fields = property(input_getter('ff'), None, None, "Input force fields (read only)")
1794 input_collections = property(input_getter('collections'), None, None, "Input collections (read only)")
1795 input_chain_names = property(input_getter('chainnames'), None, None, "Input chain names (read only)")
1796 input_framestep = property(input_getter('framestep'), None, None, "Input framestep (read only)")
1797 input_name = property(input_getter('name'), None, None, "Input name (read only)")
1798 input_description = property(input_getter('description'), None, None, "Input description (read only)")
1799 input_authors = property(input_getter('authors'), None, None, "Input authors (read only)")
1800 input_groups = property(input_getter('groups'), None, None, "Input groups (read only)")
1801 input_contact = property(input_getter('contact'), None, None, "Input contact (read only)")
1802 input_program = property(input_getter('program'), None, None, "Input program (read only)")
1803 input_version = property(input_getter('version'), None, None, "Input version (read only)")
1804 input_method = property(input_getter('method'), None, None, "Input method (read only)")
1805 input_license = property(input_getter('license'), None, None, "Input license (read only)")
1806 input_linkcense = property(input_getter('linkcense'), None, None, "Input license link (read only)")
1807 input_citation = property(input_getter('citation'), None, None, "Input citation (read only)")
1808 input_thanks = property(input_getter('thanks'), None, None, "Input acknowledgements (read only)")
1809 input_links = property(input_getter('links'), None, None, "Input links (read only)")
1810 input_timestep = property(input_getter('timestep'), None, None, "Input timestep (read only)")
1811 input_temperature = property(input_getter('temp'), None, None, "Input temperature (read only)")
1812 input_ensemble = property(input_getter('ensemble'), None, None, "Input ensemble (read only)")
1813 input_water = property(input_getter('wat'), None, None, "Input water force field (read only)")
1814 input_boxtype = property(input_getter('boxtype'), None, None, "Input boxtype (read only)")
1815 input_pbc_selection = property(input_getter('pbc_selection'), None, None, "Input Periodic Boundary Conditions (PBC) selection (read only)")
1816 input_cg_selection = property(input_getter('cg_selection'), None, None, "Input Coarse Grained (CG) selection (read only)")
1817 input_customs = property(input_getter('customs'), None, None, "Input custom representations (read only)")
1818 input_orientation = property(input_getter('orientation'), None, None, "Input orientation (read only)")
1819 input_multimeric = property(input_getter('multimeric'), None, None, "Input multimeric labels (read only)")
1820 # Additional topic-specific inputs
1821 input_cv19_unit = property(input_getter('cv19_unit'), None, None, "Input Covid-19 Unit (read only)")
1822 input_cv19_startconf = property(input_getter('cv19_startconf'), None, None, "Input Covid-19 starting conformation (read only)")
1823 input_cv19_abs = property(input_getter('cv19_abs'), None, None, "Input Covid-19 antibodies (read only)")
1824 input_cv19_nanobs = property(input_getter('cv19_nanobs'), None, None, "Input Covid-19 nanobodies (read only)")
1826 # PBC selection may come from the console or from the inputs file
1827 # Console has priority over the inputs file
1828 def get_input_pbc_selection (self) -> Optional[str]:
1829 # If we have an internal value then return it
1830 if self._input_pbc_selection:
1831 return self._input_pbc_selection
1832 # As an exception, we avoid asking for the inputs file if it is not available
1833 # This input is required for some early processing steps where we do not need the inputs file for anything else
1834 if not self.is_inputs_file_available():
1835 return None
1836 # Otherwise, find it in the inputs
1837 # Get the input value, whose key must exist
1838 self._input_pbc_selection = self.get_input('pbc_selection')
1839 return self._input_pbc_selection
1840 input_pbc_selection = property(get_input_pbc_selection, None, None, "Selection of atoms which are still in periodic boundary conditions (read only)")
1842 # CG selection may come from the console or from the inputs file
1843 # Console has priority over the inputs file
1844 def get_input_cg_selection (self) -> Optional[str]:
1845 # If we have an internal value then return it
1846 if self._input_cg_selection:
1847 return self._input_cg_selection
1848 # As an exception, we avoid asking for the inputs file if it is not available
1849 # This input is required for some early processing steps where we do not need the inputs file for anything else
1850 if not self.is_inputs_file_available():
1851 return None
1852 # Otherwise, find it in the inputs
1853 # Get the input value, whose key must exist
1854 self._input_cg_selection = self.get_input('cg_selection')
1855 return self._input_cg_selection
1856 input_cg_selection = property(get_input_cg_selection, None, None, "Selection of atoms which are not acutal atoms but Coarse Grained beads (read only)")
1858 # Set additional values infered from input values
1860 def check_is_time_dependent (self) -> bool:
1861 """Set if MDs are time dependent."""
1862 if self.input_type == 'trajectory':
1863 return True
1864 elif self.input_type == 'ensemble':
1865 return False
1866 raise InputError(f'Not supported input "type" value: {self.input_type}. It must be "trajectory" or "ensemble"')
1867 is_time_dependent = property(check_is_time_dependent, None, None, "Check if trajectory frames are time dependent (read only)")
1869 # Processed files ----------------------------------------------------
1871 # Set the expected output topology filename given the input topology filename
1872 # Note that topology formats are conserved
1873 def inherit_topology_filename (self) -> Optional[str]:
1874 if self.input_topology_file == MISSING_TOPOLOGY:
1875 return None
1876 filename = self.input_topology_file.filename
1877 if not filename:
1878 return None
1879 if filename == RAW_CHARGES_FILENAME:
1880 return filename
1881 standard_format = self.input_topology_file.format
1882 return 'topology.' + standard_format
1884 def get_topology_filepath (self) -> str:
1885 """Get the processed topology file path."""
1886 # If we have a stored value then return it
1887 if self._topology_filepath:
1888 return self._topology_filepath
1889 # Otherwise we must find it
1890 inherited_filename = self.inherit_topology_filename()
1891 self._topology_filepath = self.pathify(inherited_filename) if inherited_filename else None
1892 return self._topology_filepath
1893 topology_filepath = property(get_topology_filepath, None, None, "Topology file path (read only)")
1895 def get_topology_file (self) -> str:
1896 """Get the processed topology file."""
1897 # If we have a stored value then return it
1898 # This means we already found or generated this file
1899 if self._topology_file != None:
1900 return self._topology_file
1901 # If the file already exists then we are done
1902 self._topology_file = File(self.topology_filepath) if self.topology_filepath != None else MISSING_TOPOLOGY
1903 # If the faith flag was passed then simply make sure the input file makes sense
1904 if self.faith:
1905 if self.input_topology_file != self._topology_file:
1906 raise InputError('Input topology file is not equal to output topology file but the "faith" flag was used.\n'
1907 ' Please refrain from using the faith argument (-f) if you ignore its effect.')
1908 if not self.input_topology_file.exists:
1909 raise InputError('Input topology file does not exist but the "faith" flag was used.\n'
1910 ' Please refrain from using the faith argument (-f) if you ignore its effect.')
1911 return self._topology_file
1912 # Run the processing logic
1913 self.reference_md.process_input_files(self.reference_md)
1914 # Now that the file is sure to exist we return it
1915 return self._topology_file
1916 topology_file = property(get_topology_file, None, None, "Topology file (read only)")
1918 # Get the processed structure from the reference MD
1919 def get_structure_file (self) -> str:
1920 return self.reference_md.structure_file
1921 structure_file = property(get_structure_file, None, None, "Structure filename from the reference MD (read only)")
1923 # Get the processed trajectory from the reference MD
1924 def get_trajectory_file (self) -> str:
1925 return self.reference_md.trajectory_file
1926 trajectory_file = property(get_trajectory_file, None, None, "Trajectory filename from the reference MD (read only)")
1928 # ---------------------------------------------------------------------------------
1929 # Others values which may be found/calculated and files to be generated on demand
1930 # ---------------------------------------------------------------------------------
1932 # Parsed structure from reference MD
1933 def get_structure (self) -> 'Structure':
1934 return self.reference_md.structure
1935 structure = property(get_structure, None, None, "Parsed structure from the reference MD (read only)")
1937 # Indices of residues in periodic boundary conditions
1938 def get_pbc_residues (self) -> List[int]:
1939 return self.reference_md.pbc_residues
1940 pbc_residues = property(get_pbc_residues, None, None, "Indices of residues in periodic boundary conditions (read only)")
1942 # Indices of residues in coarse grain
1943 def get_cg_residues (self) -> List[int]:
1944 return self.reference_md.cg_residues
1945 cg_residues = property(get_cg_residues, None, None, "Indices of residues in coarse grain (read only)")
1947 # Reference MD spanshots
1948 # Used next to the reference MD trajectory data
1949 def get_snaphsots (self) -> int:
1950 return self.reference_md.snapshots
1951 snapshots = property(get_snaphsots, None, None, "Reference MD snapshots (read only)")
1953 # Check if we must check stable bonds
1954 def get_check_stable_bonds (self) -> bool:
1955 # Set if stable bonds have to be checked
1956 must_check = STABLE_BONDS_FLAG not in self.trust
1957 # If this analysis has been already passed then we can trust structure bonds
1958 if self.register.tests.get(STABLE_BONDS_FLAG, None) == True:
1959 must_check = False
1960 return must_check
1961 must_check_stable_bonds = property(get_cg_residues, None, None, "Indices of residues in coarse grain (read only)")
1963 # Reference bonds
1964 get_reference_bonds = Task('refbonds', 'Reference bonds', find_safe_bonds)
1965 reference_bonds = property(get_reference_bonds, None, None, "Atom bonds to be trusted (read only)")
1967 # Atom charges
1968 get_charges = Task('charges', 'Getting atom charges', get_charges)
1969 charges = property(get_charges, None, None, "Atom charges (read only)")
1971 # Topolody data reader
1972 def get_topology_reader (self) -> 'Topology':
1973 # If we already have a stored value then return it
1974 if self._topology_reader: return self._topology_reader
1975 # Instantiate the topology reader
1976 self._topology_reader = Topology(self.topology_file)
1977 return self._topology_reader
1978 topology_reader = property(get_topology_reader, None, None, "Topology reader (read only)")
1980 def get_dihedrals (self) -> List[dict]:
1981 """Get the topology dihedrals."""
1982 # If we already have a stored value then return it
1983 if self._dihedrals: return self._dihedrals
1984 # Calculate the dihedrals otherwise
1985 self._dihedrals = self.topology_reader.get_dihedrals_data()
1986 return self._dihedrals
1987 dihedrals = property(get_dihedrals, None, None, "Topology dihedrals (read only)")
1989 # Equilibrium populations from a MSM
1990 def get_populations (self) -> Optional[List[float]]:
1991 # If we already have a stored value then return it
1992 if self._populations:
1993 return self._populations
1994 # Otherwise we must find the value
1995 if not self.populations_file:
1996 return None
1997 self._populations = read_file(self.populations_file)
1998 return self._populations
1999 populations = property(get_populations, None, None, "Equilibrium populations from a MSM (read only)")
2001 # Transition probabilities from a MSM
2002 def get_transitions (self) -> Optional[List[List[float]]]:
2003 # If we already have a stored value then return it
2004 if self._transitions:
2005 return self._transitions
2006 # Otherwise we must find the value
2007 if not self.transitions_file:
2008 return None
2009 self._transitions = read_file(self.transitions_file)
2010 return self._transitions
2011 transitions = property(get_transitions, None, None, "Transition probabilities from a MSM (read only)")
2013 # Tested and standarized PDB ids
2014 def get_pdb_ids (self) -> List[str]:
2015 # If we already have a stored value then return it
2016 if self._pdb_ids != None:
2017 return self._pdb_ids
2018 # Otherwise test and standarize input PDB ids
2019 self._pdb_ids = []
2020 # If there is no input pdb ids (may be None) then stop here
2021 if not self.input_pdb_ids:
2022 return []
2023 # If input PDB ids is a string instead of a list then fix it
2024 input_pdb_ids = self.input_pdb_ids
2025 if type(input_pdb_ids) == str:
2026 input_pdb_ids = [ input_pdb_ids ]
2027 # Iterate input PDB ids
2028 for input_pdb_id in input_pdb_ids:
2029 # First make sure this is a PDB id
2030 if not re.match(PDB_ID_FORMAT, input_pdb_id):
2031 raise InputError(f'Input PDB id "{input_pdb_id}" does not look like a PDB id')
2032 # Make letters upper
2033 pdb_id = input_pdb_id.upper()
2034 self._pdb_ids.append(pdb_id)
2035 return self._pdb_ids
2036 pdb_ids = property(get_pdb_ids, None, None, "Tested and standarized PDB ids (read only)")
2038 # Prepare the PDB references file to be uploaded to the database
2039 get_pdb_references = Task('pdbs', 'Prepare PDB references',
2040 prepare_pdb_references, output_filename = PDB_REFERENCES_FILENAME)
2041 pdb_references_file = get_pdb_references.get_output_file
2043 # Map the structure aminoacids sequences against the Uniprot reference sequences
2044 get_protein_map = Task('protmap', 'Protein residues mapping',
2045 generate_protein_mapping, output_filename=PROTEIN_REFERENCES_FILENAME)
2046 protein_map = property(get_protein_map, None, None, "Protein residues mapping (read only)")
2048 # Define the output file of the protein mapping including protein references
2049 get_protein_references_file = get_protein_map.get_output_file
2050 protein_references_file = property(get_protein_references_file, None, None, "File including protein refereces data mined from UniProt (read only)")
2052 # Get chain references
2053 get_chain_references = Task('chains', 'Chain references',
2054 prepare_chain_references, output_filename = OUTPUT_CHAINS_FILENAME)
2056 # Get the ligand residues mapping
2057 get_ligand_map = Task('ligmap', 'Ligand residues mapping',
2058 generate_ligand_mapping, output_filename = LIGAND_REFERENCES_FILENAME)
2059 ligand_map = property(get_ligand_map, None, None, "Ligand references (read only)")
2061 # Define the output file of the ligand mapping including ligand references
2062 get_ligand_references_file = get_ligand_map.get_output_file
2063 ligand_references_file = property(get_ligand_references_file, None, None, "File including ligand refereces data mined from PubChem (read only)")
2065 # MDAnalysis Universe object
2066 get_MDAnalysis_Universe = Task('mda_univ', 'MDAnalysis Universe object',
2067 get_mda_universe, use_cache = False)
2068 universe = property(get_MDAnalysis_Universe, None, None, "MDAnalysis Universe object (read only)")
2070 # Get the lipid references
2071 get_lipid_map = Task('lipmap', 'Lipid mapping',
2072 generate_lipid_references, output_filename = LIPID_REFERENCES_FILENAME)
2073 lipid_map = property(get_lipid_map, None, None, "Lipid mapping (read only)")
2075 # Define the output file of the lipid mapping including lipid references
2076 get_lipid_references_file = get_lipid_map.get_output_file
2077 lipid_references_file = property(get_lipid_references_file, None, None, "File including lipid references data mined from PubChem (read only)")
2079 # Get mapping of residues in the membrane
2080 get_membrane_map = Task('membranes', 'Membrane mapping',
2081 generate_membrane_mapping, output_filename = MEMBRANE_MAPPING_FILENAME)
2082 membrane_map = property(get_membrane_map, None, None, "Membrane mapping (read only)")
2084 # Build the residue map from both proteins and ligands maps
2085 # This is formatted as both the standard topology and metadata producers expect them
2086 get_residue_map = Task('resmap', 'Residue mapping', generate_residue_mapping)
2087 residue_map = property(get_residue_map, None, None, "Residue map (read only)")
2089 # Get interaction types
2090 get_interaction_types = Task('intertypes', 'Finding interaction types', find_interaction_types)
2091 interaction_types = property(get_interaction_types, None, None, "Interaction types (read only)")
2093 # Prepare the project metadata file to be upload to the database
2094 prepare_metadata = Task('pmeta', 'Prepare project metadata',
2095 prepare_project_metadata, output_filename=OUTPUT_METADATA_FILENAME)
2097 # Prepare the standard topology file to be uploaded to the database
2098 prepare_standard_topology = Task('stopology', 'Standard topology file',
2099 generate_topology, output_filename = STANDARD_TOPOLOGY_FILENAME)
2100 get_standard_topology_file = prepare_standard_topology.get_output_file
2101 standard_topology_file = property(get_standard_topology_file, None, None, "Standard topology filename (read only)")
2103 # Get a screenshot of the system
2104 get_screenshot_filename = Task('screenshot', 'Screenshot file',
2105 get_screenshot, output_filename = OUTPUT_SCREENSHOT_FILENAME)
2106 screenshot_filename = property(get_screenshot_filename, None, None, "Screenshot filename (read only)")
2108# AUXILIAR FUNCTIONS ---------------------------------------------------------------------------
2110# Set a function to read a file which may be in differen formats
2111# DANI: En cuanto se concrete el formato de los markov esta función no hará falta
2112def read_file (target_file : File) -> dict:
2113 # Get the file format
2114 file_format = target_file.filename.split('.')[-1]
2115 # Read numpy files
2116 if file_format == 'npy':
2117 return numpy.load(target_file.path)
2118 # Read JSON files
2119 if file_format == 'json':
2120 return load_json(target_file.path)
2122# Set a function to convert an MD name into an equivalent MD directory
2123def name_2_directory (name : str) -> str:
2124 # Replace white spaces with underscores
2125 directory = name.replace(' ', '_')
2126 # Remove problematic characters
2127 for character in FORBIDDEN_DIRECTORY_CHARACTERS:
2128 directory = directory.replace(character, '')
2129 return directory
2131def check_directory (directory : str) -> str:
2132 """Check for problematic characters in a directory path."""
2133 # Remove problematic characters
2134 for character in FORBIDDEN_DIRECTORY_CHARACTERS:
2135 if character in directory:
2136 raise InputError(f'Directory path "{directory}" includes the forbidden character "{character}"')
2138def directory_2_name (directory : str) -> str:
2139 """Convert an MD directory into an equivalent MD name."""
2140 # Remove a possible starting './'
2141 # Replace white spaces with underscores
2142 name = directory.split('/')[-1].replace('_', ' ')
2143 return name
2145def remove_final_slash (directory : str) -> str:
2146 """Remove the final slash if exists since it may cause
2147 problems when recognizing input directories."""
2148 if directory[-1] == '/':
2149 return directory[:-1]
2150 return directory
2152# Project input files
2153project_input_files = {
2154 'itopology': Project.get_input_topology_file,
2155 'inputs': Project.get_inputs_file,
2156 'populations': Project.get_populations_file,
2157 'transitions': Project.get_transitions_file
2158}
2159# MD input files
2160md_input_files = {
2161 'istructure': MD.get_input_structure_file,
2162 'itrajectory': MD.get_input_trajectory_files
2163}
2164# Both project and MD input files
2165input_files = { **project_input_files, **md_input_files }
2167# Project processed files
2168project_processed_files = {
2169 'topology': Project.get_topology_file
2170}
2171# MD processed files
2172md_processed_files = {
2173 'structure': MD.get_structure_file,
2174 'trajectory': MD.get_trajectory_file
2175}
2176# Both project and MD processed files
2177processed_files = { **project_processed_files, **md_processed_files }
2179# Project requestable tasks
2180project_requestables = {
2181 **project_input_files,
2182 **project_processed_files,
2183}
2184# Add available tasks to project requestables
2185for callable in vars(Project).values():
2186 if isinstance(callable, Task): project_requestables[callable.flag] = callable
2187# MD requestable tasks
2188md_requestables = {
2189 **md_input_files,
2190 **md_processed_files,
2191}
2192# Add available tasks to project requestables
2193for callable in vars(MD).values():
2194 if isinstance(callable, Task): md_requestables[callable.flag] = callable
2195# Requestables for the console
2196# Note that this constant is global
2197requestables.update({ **project_requestables, **md_requestables })
2198# Inverted requestables for every function to know which is its 'label'
2199inverted_requestables.update({ v: k for k, v in requestables.items() })
2201# Set groups of dependencies to be requested together using only one flag
2202DEPENDENCY_FLAGS = {
2203 'download': list(input_files.keys()),
2204 'setup': list(processed_files.keys()),
2205 'meta': ['pmeta', 'mdmeta'],
2206 'network': [ 'mapping', 'ligands', 'chains', 'pdbs', 'membrane' ],
2207 'minimal': [ 'pmeta', 'mdmeta', 'stopology' ],
2208 'interdeps': [ 'interactions', 'pairwise', 'hbonds', 'energies', 'perres', 'clusters', 'dist' ],
2209 'membs': ['membranes', 'density', 'thickness', 'apl', 'lorder', 'linter']
2210}
2212# Set the default analyses to be run when no task is specified
2213DEFAULT_ANALYSES = ['clusters', 'dist', 'energies', 'hbonds', 'pca', 'pockets',
2214 'rgyr', 'rmsds', 'perres', 'pairwise', 'rmsf', 'sas', 'tmscore', 'density',
2215 'thickness', 'apl', 'lorder', 'linter']
2217# The actual main function
2218def workflow (
2219 # Project parameters
2220 project_parameters : dict = {},
2221 # The actual workflow parameters
2222 # The working directory
2223 working_directory : str = '.',
2224 # Download only
2225 download : bool = False,
2226 # Download and correct only
2227 setup : bool = False,
2228 # Run only specific analyses/processes
2229 include : Optional[List[str]] = None,
2230 # Run everything but specific analyses/processes
2231 exclude : Optional[List[str]] = None,
2232 # Overwrite already existing output files
2233 overwrite : Optional[ Union[ List[str], bool ] ] = None,
2234):
2236 # Check there are not input errors
2238 # Include and exclude are not compatible
2239 # This is to protect the user to do something which makes not sense
2240 if include and exclude:
2241 raise InputError('Include (-i) and exclude (-e) are not compatible. Use one of these options.')
2243 # Make sure the working directory exists
2244 if not exists(working_directory):
2245 raise InputError(f'Working directory "{working_directory}" does not exist')
2247 # Make sure the working directory is actually a directory
2248 if not isdir(working_directory):
2249 raise InputError(f'Working directory "{working_directory}" is actually not a directory')
2251 # Move the current directory to the working directory
2252 chdir(working_directory)
2253 current_directory_name = getcwd().split('/')[-1]
2254 print(f'\n{CYAN_HEADER}Running workflow for project at {current_directory_name}{COLOR_END}')
2256 # Initiate the project project
2257 project = Project(**project_parameters)
2258 print(f' {len(project.mds)} MDs are to be run')
2260 # Set the tasks to be run
2261 tasks = None
2262 # If the download argument is passed then just make sure input files are available
2263 if download:
2264 warn('The "-d" or "--download" argument is deprecated. Please use "-i download" instead.')
2265 tasks = list(input_files.keys())
2266 # If the setup argument is passed then just process input files
2267 elif setup:
2268 warn('The "-s" or "--setup" argument is deprecated. Please use "-i setup" instead.')
2269 tasks = list(processed_files.keys())
2270 # If the include argument then add only the specified tasks to the list
2271 elif include and len(include) > 0:
2272 tasks = [ *include ]
2273 # Find for special flags among included
2274 for flag, dependencies, in DEPENDENCY_FLAGS.items():
2275 if flag not in tasks: continue
2276 # If the flag is found then remove it and write the corresponding dependencie instead
2277 # Make sure not to duplicate a dependency if it was already included
2278 tasks.remove(flag)
2279 for dep in dependencies:
2280 if dep in tasks: continue
2281 tasks.append(dep)
2282 # Set the default tasks otherwise
2283 else:
2284 tasks = [
2285 # Project tasks
2286 'stopology',
2287 'screenshot',
2288 'pmeta',
2289 'pdbs',
2290 'chains',
2291 # MD tasks
2292 'mdmeta',
2293 'inter',
2294 *DEFAULT_ANALYSES,
2295 ]
2296 # If the exclude parameter was passed then remove excluded tasks from the default tasks
2297 if exclude and len(exclude) > 0:
2298 tasks = [ name for name in tasks if name not in exclude ]
2300 # If the user requested to overwrite something, make sure it is in the tasks list
2302 # Update the overwritable variable with the requested overwrites
2303 overwritables = set()
2304 if overwrite:
2305 # If the overwrite argument is simply true then add all requestables to the overwritable
2306 if type(overwrite) == bool:
2307 for task in tasks:
2308 overwritables.add(task)
2309 # If the overwrite argument is a list of tasks then iterate them
2310 elif type(overwrite) == list:
2311 for task in overwrite:
2312 # Make sure the task to be overwriten is among the tasks to be run
2313 if task not in tasks:
2314 raise InputError(f'Task "{task}" is to be overwriten but it is not in the tasks list. Either include it or do not exclude it')
2315 # Add it to the global variable
2316 overwritables.add(task)
2317 else: raise ValueError('Not supported overwrite type')
2319 # Get project tasks
2320 project_tasks = [ task for task in tasks if task in project_requestables ]
2321 # Get the MD tasks
2322 md_tasks = [ task for task in tasks if task in md_requestables ]
2324 # Set project overwritables
2325 project.overwritables = set([ task for task in project_tasks if task in overwritables ])
2326 # Set MD overwrittables
2327 # Note that this must be done before running project tasks
2328 # Some project tasks rely in MD tasks
2329 for md in project.mds:
2330 md.overwritables = set([ task for task in md_tasks if task in overwritables ])
2332 # Run the project tasks now
2333 for task in project_tasks:
2334 # Get the function to be called and call it
2335 getter = requestables[task]
2336 getter(project)
2338 # If there are no MD tasks then we are done already
2339 if len(md_tasks) == 0:
2340 print("Finished!")
2341 return
2343 # Now iterate over the different MDs
2344 for md in project.mds:
2345 print(f'\n{CYAN_HEADER} Processing MD at {md.directory}{COLOR_END}')
2346 # Run the MD tasks
2347 for task in md_tasks:
2348 # Get the function to be called and call it
2349 getter = requestables[task]
2350 getter(md)
2352 # Remove gromacs backups and other trash files from this MD
2353 remove_trash(md.directory)
2355 # Remove gromacs backups and other trash files from the project
2356 remove_trash(project.directory)
2358 print("Done!")