Coverage for mddb_workflow / utils / file.py: 84%
129 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 18:45 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 18:45 +0000
1from os import remove, symlink, rename, readlink
2from os.path import exists, isabs, abspath, relpath, split, islink, normpath, getmtime, getsize
3from time import strftime, gmtime
4from shutil import copyfile
5from typing import Optional
6from time import time
8import xxhash
10from mddb_workflow.utils.constants import EXTENSION_FORMATS, PYTRAJ_SUPPORTED_FORMATS, PYTRAJ_PARM_FORMAT
11from mddb_workflow.utils.constants import DATE_STYLE, GLOBALS
12from mddb_workflow.utils.auxiliar import InputError
14LOCAL_PATH = '.'
17class File:
18 """ File handler class.
19 Absolute paths are used in runtime.
20 Relative paths are used to store paths.
21 """
22 def __init__ (self, relative_or_absolute_path : str):
23 # If there is no path then complain
24 if not relative_or_absolute_path:
25 raise RuntimeError('Declared file with no path')
26 # Declare all attributes as none by default
27 self.absolute_path = self.relative_path = self.path = None
28 self.basepath = self.filename = None
29 self.extension = None
30 self.extensionless_filename = None
31 self.extensionless_filepath = None
32 # If input path is absolute
33 if isabs(relative_or_absolute_path[0]):
34 self.absolute_path = relative_or_absolute_path
35 self.relative_path = relpath(self.absolute_path, LOCAL_PATH)
36 # If it is relative
37 else:
38 self.relative_path = relative_or_absolute_path
39 self.absolute_path = abspath(self.relative_path)
40 # When simply a path is requested we return the relative path
41 # Note that normalizing the path is essential to recognize same filepaths
42 # Otherwise we could have './myfile' and 'myfile' considered as different filepaths
43 self.path = normpath(self.relative_path)
44 # Capture the filename and the basepath
45 self.basepath, self.filename = split(self.path)
46 # If the basepath is empty then it means the file is in the local directroy
47 # WARNING: If the basepath is left empty an exists(basepath) would return false
48 # WARNING: For this reason we must replace '' by '.'
49 if not self.basepath:
50 self.basepath = LOCAL_PATH
51 # Set the file extension
52 self.extension = self.filename.split('.')[-1]
53 if self.extension == self.filename:
54 self.extension = None
55 # Set the extensionless filename
56 self.extensionless_filename = self.filename
57 self.extensionless_filepath = self.path
58 if self.extension:
59 extension_size = len(self.extension) + 1 # We include here the dot
60 self.extensionless_filename = self.filename[:-extension_size]
61 self.extensionless_filepath = self.path[:-extension_size]
62 # Set internal values
63 self._cksum = None
65 # We must display the cksum here
66 # Note that this is critical for the task args cksum when we handle lists of files
67 # e.g. input_trajectory_files in process_input_files
68 def __repr__ (self) -> str:
69 if not self.filename:
70 return '< No file >'
71 cksum = self.get_cksum(unsafe=True)
72 return f'< File {cksum} >'
74 def __str__ (self) -> str:
75 return self.__repr__()
77 def __hash__ (self) -> str:
78 return hash(self.path) # Path is already normalized
80 def __bool__ (self) -> bool:
81 return bool(self.filename)
83 def __eq__ (self, other : 'File') -> bool:
84 if isinstance(other, self.__class__):
85 return self.path == other.path # Paths are already normalized
86 return False
88 def check_existence (self) -> bool:
89 """ Check if file exists. """
90 return exists(self.path)
91 exists = property(check_existence, None, None, "Does the file exists? (read only)")
93 def get_format (self) -> Optional[str]:
94 """ Get file format based on the extension.
95 If the extension is not recognized then raise an error. """
96 if not self.extension:
97 return None
98 extension_format = EXTENSION_FORMATS.get(self.extension, None)
99 if not extension_format:
100 raise InputError(f'Not recognized format extension "{self.extension}" from file "{self.filename}"')
101 return extension_format
102 format = property(get_format, None, None, "File standard format (read only)")
104 def get_mtime (self) -> str:
105 """ Get the file last modification time. """
106 raw_mtime = getmtime(self.path)
107 return strftime(DATE_STYLE, gmtime(raw_mtime))
108 mtime = property(get_mtime, None, None, "File last modification date (read only)")
110 def get_size (self) -> str:
111 """ Get the file size in bytes. """
112 return getsize(self.path)
113 size = property(get_size, None, None, "File size in bytes (read only)")
115 CKSUM_UNSAFE_SIZE_LIMIT = 1024 * 1024 * 100 # 100 MB
116 def get_cksum (self, unsafe : bool = False, verbose : bool = False) -> str:
117 """ Get a cksum code used to compare identical file content.
118 Use the unsafe argument to make it way faster for large files by reading them partialy."""
119 # If we already have a value then return it
120 if self._cksum != None: return self._cksum
121 # If the file does not exist then there is no cksum
122 if not self.exists: return None
123 # Set if the cksum will be unsafe
124 # Note that files lighter than the size limit will always have safe cksums
125 is_unsafe = unsafe and self.size > self.CKSUM_UNSAFE_SIZE_LIMIT
126 # Calculate the xxhash of the whole file content
127 # This should be the faster method available whcih still reads all content
128 if verbose: start_time = time()
129 hasher = xxhash.xxh64()
130 with open(self.path, 'rb') as file:
131 if is_unsafe: hasher.update(file.read(self.CKSUM_UNSAFE_SIZE_LIMIT))
132 # DANI: This is not memory safe, a big file could consume all memory
133 # DANI: We should iterate chunks but I was on a hurry
134 else: hasher.update(file.read())
135 final_xxhash = hasher.hexdigest()
136 if verbose:
137 end_time = time()
138 total_time = end_time - start_time
139 print(f'Got cksum for {self.path} ({self.size} Bytes) in {total_time:.2f} seconds -> {final_xxhash}')
140 self._cksum = f'{self.size}-{"(UNSAFE)" if is_unsafe else ""}{final_xxhash}'
141 return self._cksum
143 # Set a couple of additional functions according to pytraj format requirements
144 def is_pytraj_supported (self) -> bool:
145 return self.format in PYTRAJ_SUPPORTED_FORMATS
146 def get_pytraj_parm_format (self) -> Optional[str]:
147 return PYTRAJ_PARM_FORMAT.get(self.format, None)
149 def remove (self):
150 """ Remove the file. """
151 remove(self.path)
153 def get_standard_file (self) -> 'File':
154 """ Given a file who has non-standard extension of a supported format we set a symlink with the standard extension. """
155 # If current file already has the extension then there is nothing to return
156 if self.extension == self.format:
157 return self
158 return self.reformat(self.format)
160 def reformat (self, new_extension : str) -> 'File':
161 """ Given a file and a new extension we set a symlink from a new file with that extension. """
162 # Set the filename with the standard extension and initiate the file
163 reformatted_filename = f'{self.extensionless_filepath}.{new_extension}'
164 reformatted_file = File(reformatted_filename)
165 # If standard file does not exist then set a symlink
166 if not reformatted_file.exists:
167 reformatted_file.set_symlink_to(self)
168 return reformatted_file
170 def get_prefixed_file (self, prefix : str) -> 'File':
171 """ Get a prefixed file using this file name as the name base. """
172 return File(f'{self.basepath}/{prefix}{self.filename}')
174 def get_neighbour_file (self, filename : str) -> 'File':
175 """ Get a file in the same path but with a different name."""
176 return File(f'{self.basepath}/{filename}')
178 def get_symlink (self) -> Optional['File']:
179 """ Get the symlink target of this file. """
180 target_filepath = readlink(self.path)
181 if not target_filepath:
182 return None
183 return File(self.basepath + '/' + target_filepath)
185 def set_symlink_to (self, other_file : 'File'):
186 """ Set this file a symlink to another file. """
187 # Check if symlinks are allowed
188 no_symlinks = GLOBALS['no_symlinks']
189 # If symlinks are now allowed then copy the file instead
190 if no_symlinks:
191 other_file.copy_to(self)
192 return
193 # Self file must not exist
194 if self.exists:
195 raise Exception('Cannot set a symlink from an already existing file: ' + str(self))
196 # Note that symlink path must be relative to this file
197 relative_path = relpath(other_file.path, self.basepath)
198 # Set the symlink
199 symlink(relative_path, self.path)
201 def is_symlink (self) -> bool:
202 """ Check if a file is already a symlink. """
203 return islink(self.path)
205 def copy_to (self, other_file : 'File'):
206 """ Copy a file to another. """
207 copyfile(self.path, other_file.path)
209 def rename_to (self, other_file : 'File'):
210 """ Rename a file to another. """
211 rename(self.path, other_file.path)