Coverage for mddb_workflow / utils / file.py: 84%

129 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 18:45 +0000

1from os import remove, symlink, rename, readlink 

2from os.path import exists, isabs, abspath, relpath, split, islink, normpath, getmtime, getsize 

3from time import strftime, gmtime 

4from shutil import copyfile 

5from typing import Optional 

6from time import time 

7 

8import xxhash 

9 

10from mddb_workflow.utils.constants import EXTENSION_FORMATS, PYTRAJ_SUPPORTED_FORMATS, PYTRAJ_PARM_FORMAT 

11from mddb_workflow.utils.constants import DATE_STYLE, GLOBALS 

12from mddb_workflow.utils.auxiliar import InputError 

13 

14LOCAL_PATH = '.' 

15 

16 

17class File: 

18 """ File handler class. 

19 Absolute paths are used in runtime. 

20 Relative paths are used to store paths. 

21 """ 

22 def __init__ (self, relative_or_absolute_path : str): 

23 # If there is no path then complain 

24 if not relative_or_absolute_path: 

25 raise RuntimeError('Declared file with no path') 

26 # Declare all attributes as none by default 

27 self.absolute_path = self.relative_path = self.path = None 

28 self.basepath = self.filename = None 

29 self.extension = None 

30 self.extensionless_filename = None 

31 self.extensionless_filepath = None 

32 # If input path is absolute 

33 if isabs(relative_or_absolute_path[0]): 

34 self.absolute_path = relative_or_absolute_path 

35 self.relative_path = relpath(self.absolute_path, LOCAL_PATH) 

36 # If it is relative 

37 else: 

38 self.relative_path = relative_or_absolute_path 

39 self.absolute_path = abspath(self.relative_path) 

40 # When simply a path is requested we return the relative path 

41 # Note that normalizing the path is essential to recognize same filepaths 

42 # Otherwise we could have './myfile' and 'myfile' considered as different filepaths 

43 self.path = normpath(self.relative_path) 

44 # Capture the filename and the basepath 

45 self.basepath, self.filename = split(self.path) 

46 # If the basepath is empty then it means the file is in the local directroy 

47 # WARNING: If the basepath is left empty an exists(basepath) would return false 

48 # WARNING: For this reason we must replace '' by '.' 

49 if not self.basepath: 

50 self.basepath = LOCAL_PATH 

51 # Set the file extension 

52 self.extension = self.filename.split('.')[-1] 

53 if self.extension == self.filename: 

54 self.extension = None 

55 # Set the extensionless filename 

56 self.extensionless_filename = self.filename 

57 self.extensionless_filepath = self.path 

58 if self.extension: 

59 extension_size = len(self.extension) + 1 # We include here the dot 

60 self.extensionless_filename = self.filename[:-extension_size] 

61 self.extensionless_filepath = self.path[:-extension_size] 

62 # Set internal values 

63 self._cksum = None 

64 

65 # We must display the cksum here 

66 # Note that this is critical for the task args cksum when we handle lists of files 

67 # e.g. input_trajectory_files in process_input_files 

68 def __repr__ (self) -> str: 

69 if not self.filename: 

70 return '< No file >' 

71 cksum = self.get_cksum(unsafe=True) 

72 return f'< File {cksum} >' 

73 

74 def __str__ (self) -> str: 

75 return self.__repr__() 

76 

77 def __hash__ (self) -> str: 

78 return hash(self.path) # Path is already normalized 

79 

80 def __bool__ (self) -> bool: 

81 return bool(self.filename) 

82 

83 def __eq__ (self, other : 'File') -> bool: 

84 if isinstance(other, self.__class__): 

85 return self.path == other.path # Paths are already normalized 

86 return False 

87 

88 def check_existence (self) -> bool: 

89 """ Check if file exists. """ 

90 return exists(self.path) 

91 exists = property(check_existence, None, None, "Does the file exists? (read only)") 

92 

93 def get_format (self) -> Optional[str]: 

94 """ Get file format based on the extension. 

95 If the extension is not recognized then raise an error. """ 

96 if not self.extension: 

97 return None 

98 extension_format = EXTENSION_FORMATS.get(self.extension, None) 

99 if not extension_format: 

100 raise InputError(f'Not recognized format extension "{self.extension}" from file "{self.filename}"') 

101 return extension_format 

102 format = property(get_format, None, None, "File standard format (read only)") 

103 

104 def get_mtime (self) -> str: 

105 """ Get the file last modification time. """ 

106 raw_mtime = getmtime(self.path) 

107 return strftime(DATE_STYLE, gmtime(raw_mtime)) 

108 mtime = property(get_mtime, None, None, "File last modification date (read only)") 

109 

110 def get_size (self) -> str: 

111 """ Get the file size in bytes. """ 

112 return getsize(self.path) 

113 size = property(get_size, None, None, "File size in bytes (read only)") 

114 

115 CKSUM_UNSAFE_SIZE_LIMIT = 1024 * 1024 * 100 # 100 MB 

116 def get_cksum (self, unsafe : bool = False, verbose : bool = False) -> str: 

117 """ Get a cksum code used to compare identical file content. 

118 Use the unsafe argument to make it way faster for large files by reading them partialy.""" 

119 # If we already have a value then return it 

120 if self._cksum != None: return self._cksum 

121 # If the file does not exist then there is no cksum 

122 if not self.exists: return None 

123 # Set if the cksum will be unsafe 

124 # Note that files lighter than the size limit will always have safe cksums 

125 is_unsafe = unsafe and self.size > self.CKSUM_UNSAFE_SIZE_LIMIT 

126 # Calculate the xxhash of the whole file content 

127 # This should be the faster method available whcih still reads all content 

128 if verbose: start_time = time() 

129 hasher = xxhash.xxh64() 

130 with open(self.path, 'rb') as file: 

131 if is_unsafe: hasher.update(file.read(self.CKSUM_UNSAFE_SIZE_LIMIT)) 

132 # DANI: This is not memory safe, a big file could consume all memory 

133 # DANI: We should iterate chunks but I was on a hurry 

134 else: hasher.update(file.read()) 

135 final_xxhash = hasher.hexdigest() 

136 if verbose: 

137 end_time = time() 

138 total_time = end_time - start_time 

139 print(f'Got cksum for {self.path} ({self.size} Bytes) in {total_time:.2f} seconds -> {final_xxhash}') 

140 self._cksum = f'{self.size}-{"(UNSAFE)" if is_unsafe else ""}{final_xxhash}' 

141 return self._cksum 

142 

143 # Set a couple of additional functions according to pytraj format requirements 

144 def is_pytraj_supported (self) -> bool: 

145 return self.format in PYTRAJ_SUPPORTED_FORMATS 

146 def get_pytraj_parm_format (self) -> Optional[str]: 

147 return PYTRAJ_PARM_FORMAT.get(self.format, None) 

148 

149 def remove (self): 

150 """ Remove the file. """ 

151 remove(self.path) 

152 

153 def get_standard_file (self) -> 'File': 

154 """ Given a file who has non-standard extension of a supported format we set a symlink with the standard extension. """ 

155 # If current file already has the extension then there is nothing to return 

156 if self.extension == self.format: 

157 return self 

158 return self.reformat(self.format) 

159 

160 def reformat (self, new_extension : str) -> 'File': 

161 """ Given a file and a new extension we set a symlink from a new file with that extension. """ 

162 # Set the filename with the standard extension and initiate the file 

163 reformatted_filename = f'{self.extensionless_filepath}.{new_extension}' 

164 reformatted_file = File(reformatted_filename) 

165 # If standard file does not exist then set a symlink 

166 if not reformatted_file.exists: 

167 reformatted_file.set_symlink_to(self) 

168 return reformatted_file 

169 

170 def get_prefixed_file (self, prefix : str) -> 'File': 

171 """ Get a prefixed file using this file name as the name base. """ 

172 return File(f'{self.basepath}/{prefix}{self.filename}') 

173 

174 def get_neighbour_file (self, filename : str) -> 'File': 

175 """ Get a file in the same path but with a different name.""" 

176 return File(f'{self.basepath}/{filename}') 

177 

178 def get_symlink (self) -> Optional['File']: 

179 """ Get the symlink target of this file. """ 

180 target_filepath = readlink(self.path) 

181 if not target_filepath: 

182 return None 

183 return File(self.basepath + '/' + target_filepath) 

184 

185 def set_symlink_to (self, other_file : 'File'): 

186 """ Set this file a symlink to another file. """ 

187 # Check if symlinks are allowed 

188 no_symlinks = GLOBALS['no_symlinks'] 

189 # If symlinks are now allowed then copy the file instead 

190 if no_symlinks: 

191 other_file.copy_to(self) 

192 return 

193 # Self file must not exist 

194 if self.exists: 

195 raise Exception('Cannot set a symlink from an already existing file: ' + str(self)) 

196 # Note that symlink path must be relative to this file 

197 relative_path = relpath(other_file.path, self.basepath) 

198 # Set the symlink 

199 symlink(relative_path, self.path) 

200 

201 def is_symlink (self) -> bool: 

202 """ Check if a file is already a symlink. """ 

203 return islink(self.path) 

204 

205 def copy_to (self, other_file : 'File'): 

206 """ Copy a file to another. """ 

207 copyfile(self.path, other_file.path) 

208 

209 def rename_to (self, other_file : 'File'): 

210 """ Rename a file to another. """ 

211 rename(self.path, other_file.path)