Coverage for model_workflow/utils/file.py: 86%
115 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
1from os import remove, symlink, rename, readlink
2from os.path import exists, isabs, abspath, relpath, split, islink, normpath, getmtime, getsize
3from time import strftime, gmtime
4from shutil import copyfile
5from typing import Optional
7from model_workflow.utils.constants import EXTENSION_FORMATS, PYTRAJ_SUPPORTED_FORMATS, PYTRAJ_PARM_FORMAT
8from model_workflow.utils.constants import DATE_STYLE, GLOBALS
9from model_workflow.utils.auxiliar import InputError
11LOCAL_PATH = '.'
14class File:
15 """File handler class.
16 Absolute paths are used in runtime.
17 Relative paths are used to store paths.
18 """
19 def __init__ (self, relative_or_basolute_path : Optional[str]):
20 # Declare all attributes as none by default
21 self.absolute_path = self.relative_path = self.path = None
22 self.basepath = self.filename = None
23 self.extension = None
24 self.extensionless_filename = None
25 self.extensionless_filepath = None
26 # If there is no path then leave everything as none
27 if not relative_or_basolute_path:
28 return
29 # If input path is absolute
30 if isabs(relative_or_basolute_path[0]):
31 self.absolute_path = relative_or_basolute_path
32 self.relative_path = relpath(self.absolute_path, LOCAL_PATH)
33 # If it is relative
34 else:
35 self.relative_path = relative_or_basolute_path
36 self.absolute_path = abspath(self.relative_path)
37 # When simply a path is requested we return the relative path
38 # Note that normalizing the path is essential to recognize same filepaths
39 # Otherwise we could have './myfile' and 'myfile' considered as different filepaths
40 self.path = normpath(self.relative_path)
41 # Capture the filename and the basepath
42 self.basepath, self.filename = split(self.path)
43 # If the basepath is empty then it means the file is in the local directroy
44 # WARNING: If the basepath is left empty an exists(basepath) would return false
45 # WARNING: For this reason we must replace '' by '.'
46 if not self.basepath:
47 self.basepath = LOCAL_PATH
48 # Set the file extension
49 self.extension = self.filename.split('.')[-1]
50 if self.extension == self.filename:
51 self.extension = None
52 # Set the extensionless filename
53 self.extensionless_filename = self.filename
54 self.extensionless_filepath = self.path
55 if self.extension:
56 extension_size = len(self.extension) + 1 # We include here the dot
57 self.extensionless_filename = self.filename[:-extension_size]
58 self.extensionless_filepath = self.path[:-extension_size]
59 # Set internal values
60 self._cksum = None
62 # We must display the cksum here
63 # Note that this is critical for the task args cksum when we handle lists of files
64 # e.g. input_trajectory_files in process_input_files
65 def __repr__ (self) -> str:
66 if not self.filename:
67 return '< No file >'
68 return f'< File {self.cksum} >'
70 def __str__ (self) -> str:
71 return self.__repr__()
73 def __hash__ (self) -> str:
74 return hash(self.path) # Path is already normalized
76 def __bool__ (self) -> bool:
77 return bool(self.filename)
79 def __eq__ (self, other : 'File') -> bool:
80 if isinstance(other, self.__class__):
81 return self.path == other.path # Paths are already normalized
82 return False
84 def check_existence (self) -> bool:
85 """Check if file exists."""
86 return exists(self.path)
87 exists = property(check_existence, None, None, "Does the file exists? (read only)")
89 def get_format (self) -> Optional[str]:
90 """Get file format based on the extension.
91 If the extension is not recognized then raise an error."""
92 if not self.extension:
93 return None
94 extension_format = EXTENSION_FORMATS.get(self.extension, None)
95 if not extension_format:
96 raise InputError(f'Not recognized format extension "{self.extension}" from file "{self.filename}"')
97 return extension_format
98 format = property(get_format, None, None, "File standard format (read only)")
100 def get_mtime (self) -> str:
101 """Get the file last modification time."""
102 raw_mtime = getmtime(self.path)
103 return strftime(DATE_STYLE, gmtime(raw_mtime))
104 mtime = property(get_mtime, None, None, "File last modification date (read only)")
106 def get_size (self) -> str:
107 """Get the file size in bytes."""
108 return getsize(self.path)
109 size = property(get_size, None, None, "File size in bytes (read only)")
111 # DANI: This is provisional and it is not yet based in a cksum neither the file content
112 def get_cksum (self) -> str:
113 """Get a cksum code used to compare identical file content."""
114 # If we already have an internal value then use it
115 if self._cksum != None: return self._cksum
116 # Calculate it otherwise
117 if not self.exists: self._cksum = f'missing {self.path}'
118 else: self._cksum = f'{self.path} -> {self.mtime} {self.size}'
119 return self._cksum
120 cksum = property(get_cksum, None, None, "Cksum code used to compare identical file content (read only)")
122 # Set a couple of additional functions according to pytraj format requirements
123 def is_pytraj_supported (self) -> bool:
124 return self.format in PYTRAJ_SUPPORTED_FORMATS
125 def get_pytraj_parm_format (self) -> Optional[str]:
126 return PYTRAJ_PARM_FORMAT.get(self.format, None)
128 def remove (self):
129 """Remove the file."""
130 remove(self.path)
132 def get_standard_file (self) -> 'File':
133 """Given a file who has non-standard extension of a supported format we set a symlink with the standard extension."""
134 # If current file already has the extension then there is nothing to return
135 if self.extension == self.format:
136 return self
137 return self.reformat(self.format)
139 def reformat (self, new_extension : str) -> 'File':
140 """Given a file and a new extension we set a symlink from a new file with that extension."""
141 # Set the filename with the standard extension and initiate the file
142 reformatted_filename = f'{self.extensionless_filepath}.{new_extension}'
143 reformatted_file = File(reformatted_filename)
144 # If standard file does not exist then set a symlink
145 if not reformatted_file.exists:
146 reformatted_file.set_symlink_to(self)
147 return reformatted_file
149 def get_prefixed_file (self, prefix : str) -> 'File':
150 """Get a prefixed file using this file name as the name base."""
151 return File(f'{self.basepath}/{prefix}{self.filename}')
153 def get_neighbour_file (self, filename : str) -> 'File':
154 """Get a file in the same path but with a different name."""
155 return File(f'{self.basepath}/{filename}')
157 def get_symlink (self) -> Optional['File']:
158 """Get the symlink target of this file."""
159 target_filepath = readlink(self.path)
160 if not target_filepath:
161 return None
162 return File(self.basepath + '/' + target_filepath)
164 def set_symlink_to (self, other_file : 'File'):
165 """Set this file a symlink to another file."""
166 # Check if symlinks are allowed
167 no_symlinks = GLOBALS['no_symlinks']
168 # If symlinks are now allowed then copy the file instead
169 if no_symlinks:
170 other_file.copy_to(self)
171 return
172 # Self file must not exist
173 if self.exists:
174 raise Exception('Cannot set a symlink from an already existing file: ' + str(self))
175 # Note that symlink path must be relative to this file
176 relative_path = relpath(other_file.path, self.basepath)
177 # Set the symlink
178 symlink(relative_path, self.path)
180 def is_symlink (self) -> bool:
181 """Check if a file is already a symlink."""
182 return islink(self.path)
184 def copy_to (self, other_file : 'File'):
185 """Copy a file to another."""
186 copyfile(self.path, other_file.path)
188 def rename_to (self, other_file : 'File'):
189 """Rename a file to another."""
190 rename(self.path, other_file.path)