Coverage for mddb_workflow/utils/file.py: 85%
113 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 15:48 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 15:48 +0000
1from os import remove, symlink, rename, readlink
2from os.path import exists, isabs, abspath, relpath, split, islink, normpath, getmtime, getsize
3from time import strftime, gmtime
4from shutil import copyfile
5from typing import Optional
7from mddb_workflow.utils.constants import EXTENSION_FORMATS, PYTRAJ_SUPPORTED_FORMATS, PYTRAJ_PARM_FORMAT
8from mddb_workflow.utils.constants import DATE_STYLE, GLOBALS
9from mddb_workflow.utils.auxiliar import InputError
11LOCAL_PATH = '.'
14class File:
15 """ File handler class.
16 Absolute paths are used in runtime.
17 Relative paths are used to store paths.
18 """
19 def __init__ (self, relative_or_basolute_path : Optional[str]):
20 # Declare all attributes as none by default
21 self.absolute_path = self.relative_path = self.path = None
22 self.basepath = self.filename = None
23 self.extension = None
24 self.extensionless_filename = None
25 self.extensionless_filepath = None
26 # If there is no path then leave everything as none
27 if not relative_or_basolute_path:
28 return
29 # If input path is absolute
30 if isabs(relative_or_basolute_path[0]):
31 self.absolute_path = relative_or_basolute_path
32 self.relative_path = relpath(self.absolute_path, LOCAL_PATH)
33 # If it is relative
34 else:
35 self.relative_path = relative_or_basolute_path
36 self.absolute_path = abspath(self.relative_path)
37 # When simply a path is requested we return the relative path
38 # Note that normalizing the path is essential to recognize same filepaths
39 # Otherwise we could have './myfile' and 'myfile' considered as different filepaths
40 self.path = normpath(self.relative_path)
41 # Capture the filename and the basepath
42 self.basepath, self.filename = split(self.path)
43 # If the basepath is empty then it means the file is in the local directroy
44 # WARNING: If the basepath is left empty an exists(basepath) would return false
45 # WARNING: For this reason we must replace '' by '.'
46 if not self.basepath:
47 self.basepath = LOCAL_PATH
48 # Set the file extension
49 self.extension = self.filename.split('.')[-1]
50 if self.extension == self.filename:
51 self.extension = None
52 # Set the extensionless filename
53 self.extensionless_filename = self.filename
54 self.extensionless_filepath = self.path
55 if self.extension:
56 extension_size = len(self.extension) + 1 # We include here the dot
57 self.extensionless_filename = self.filename[:-extension_size]
58 self.extensionless_filepath = self.path[:-extension_size]
59 # Set internal values
60 self._cksum = None
62 # We must display the cksum here
63 # Note that this is critical for the task args cksum when we handle lists of files
64 # e.g. input_trajectory_files in process_input_files
65 def __repr__ (self) -> str:
66 if not self.filename:
67 return '< No file >'
68 return f'< File {self.cksum} >'
70 def __str__ (self) -> str:
71 return self.__repr__()
73 def __hash__ (self) -> str:
74 return hash(self.path) # Path is already normalized
76 def __bool__ (self) -> bool:
77 return bool(self.filename)
79 def __eq__ (self, other : 'File') -> bool:
80 if isinstance(other, self.__class__):
81 return self.path == other.path # Paths are already normalized
82 return False
84 def check_existence (self) -> bool:
85 """ Check if file exists. """
86 return exists(self.path)
87 exists = property(check_existence, None, None, "Does the file exists? (read only)")
89 def get_format (self) -> Optional[str]:
90 """ Get file format based on the extension.
91 If the extension is not recognized then raise an error. """
92 if not self.extension:
93 return None
94 extension_format = EXTENSION_FORMATS.get(self.extension, None)
95 if not extension_format:
96 raise InputError(f'Not recognized format extension "{self.extension}" from file "{self.filename}"')
97 return extension_format
98 format = property(get_format, None, None, "File standard format (read only)")
100 def get_mtime (self) -> str:
101 """ Get the file last modification time. """
102 raw_mtime = getmtime(self.path)
103 return strftime(DATE_STYLE, gmtime(raw_mtime))
104 mtime = property(get_mtime, None, None, "File last modification date (read only)")
106 def get_size (self) -> str:
107 """ Get the file size in bytes. """
108 return getsize(self.path)
109 size = property(get_size, None, None, "File size in bytes (read only)")
111 # DANI: This is provisional and it is not yet based in a cksum neither the file content
112 def get_cksum (self) -> str:
113 """ Get a cksum code used to compare identical file content. """
114 # Calculate it otherwise
115 if not self.exists: return f'missing {self.path}'
116 return f'{self.path} -> {self.mtime} {self.size}'
117 cksum = property(get_cksum, None, None, "Cksum code used to compare identical file content (read only)")
119 # Set a couple of additional functions according to pytraj format requirements
120 def is_pytraj_supported (self) -> bool:
121 return self.format in PYTRAJ_SUPPORTED_FORMATS
122 def get_pytraj_parm_format (self) -> Optional[str]:
123 return PYTRAJ_PARM_FORMAT.get(self.format, None)
125 def remove (self):
126 """ Remove the file. """
127 remove(self.path)
129 def get_standard_file (self) -> 'File':
130 """ Given a file who has non-standard extension of a supported format we set a symlink with the standard extension. """
131 # If current file already has the extension then there is nothing to return
132 if self.extension == self.format:
133 return self
134 return self.reformat(self.format)
136 def reformat (self, new_extension : str) -> 'File':
137 """ Given a file and a new extension we set a symlink from a new file with that extension. """
138 # Set the filename with the standard extension and initiate the file
139 reformatted_filename = f'{self.extensionless_filepath}.{new_extension}'
140 reformatted_file = File(reformatted_filename)
141 # If standard file does not exist then set a symlink
142 if not reformatted_file.exists:
143 reformatted_file.set_symlink_to(self)
144 return reformatted_file
146 def get_prefixed_file (self, prefix : str) -> 'File':
147 """ Get a prefixed file using this file name as the name base. """
148 return File(f'{self.basepath}/{prefix}{self.filename}')
150 def get_neighbour_file (self, filename : str) -> 'File':
151 """ Get a file in the same path but with a different name."""
152 return File(f'{self.basepath}/{filename}')
154 def get_symlink (self) -> Optional['File']:
155 """ Get the symlink target of this file. """
156 target_filepath = readlink(self.path)
157 if not target_filepath:
158 return None
159 return File(self.basepath + '/' + target_filepath)
161 def set_symlink_to (self, other_file : 'File'):
162 """ Set this file a symlink to another file. """
163 # Check if symlinks are allowed
164 no_symlinks = GLOBALS['no_symlinks']
165 # If symlinks are now allowed then copy the file instead
166 if no_symlinks:
167 other_file.copy_to(self)
168 return
169 # Self file must not exist
170 if self.exists:
171 raise Exception('Cannot set a symlink from an already existing file: ' + str(self))
172 # Note that symlink path must be relative to this file
173 relative_path = relpath(other_file.path, self.basepath)
174 # Set the symlink
175 symlink(relative_path, self.path)
177 def is_symlink (self) -> bool:
178 """ Check if a file is already a symlink. """
179 return islink(self.path)
181 def copy_to (self, other_file : 'File'):
182 """ Copy a file to another. """
183 copyfile(self.path, other_file.path)
185 def rename_to (self, other_file : 'File'):
186 """ Rename a file to another. """
187 rename(self.path, other_file.path)