Coverage for model_workflow/utils/file.py: 86%

115 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 10:54 +0000

1from os import remove, symlink, rename, readlink 

2from os.path import exists, isabs, abspath, relpath, split, islink, normpath, getmtime, getsize 

3from time import strftime, gmtime 

4from shutil import copyfile 

5from typing import Optional 

6 

7from model_workflow.utils.constants import EXTENSION_FORMATS, PYTRAJ_SUPPORTED_FORMATS, PYTRAJ_PARM_FORMAT 

8from model_workflow.utils.constants import DATE_STYLE, GLOBALS 

9from model_workflow.utils.auxiliar import InputError 

10 

11LOCAL_PATH = '.' 

12 

13 

14class File: 

15 """File handler class. 

16 Absolute paths are used in runtime. 

17 Relative paths are used to store paths. 

18 """ 

19 def __init__ (self, relative_or_basolute_path : Optional[str]): 

20 # Declare all attributes as none by default 

21 self.absolute_path = self.relative_path = self.path = None 

22 self.basepath = self.filename = None 

23 self.extension = None 

24 self.extensionless_filename = None 

25 self.extensionless_filepath = None 

26 # If there is no path then leave everything as none 

27 if not relative_or_basolute_path: 

28 return 

29 # If input path is absolute 

30 if isabs(relative_or_basolute_path[0]): 

31 self.absolute_path = relative_or_basolute_path 

32 self.relative_path = relpath(self.absolute_path, LOCAL_PATH) 

33 # If it is relative 

34 else: 

35 self.relative_path = relative_or_basolute_path 

36 self.absolute_path = abspath(self.relative_path) 

37 # When simply a path is requested we return the relative path 

38 # Note that normalizing the path is essential to recognize same filepaths 

39 # Otherwise we could have './myfile' and 'myfile' considered as different filepaths 

40 self.path = normpath(self.relative_path) 

41 # Capture the filename and the basepath 

42 self.basepath, self.filename = split(self.path) 

43 # If the basepath is empty then it means the file is in the local directroy 

44 # WARNING: If the basepath is left empty an exists(basepath) would return false 

45 # WARNING: For this reason we must replace '' by '.' 

46 if not self.basepath: 

47 self.basepath = LOCAL_PATH 

48 # Set the file extension 

49 self.extension = self.filename.split('.')[-1] 

50 if self.extension == self.filename: 

51 self.extension = None 

52 # Set the extensionless filename 

53 self.extensionless_filename = self.filename 

54 self.extensionless_filepath = self.path 

55 if self.extension: 

56 extension_size = len(self.extension) + 1 # We include here the dot 

57 self.extensionless_filename = self.filename[:-extension_size] 

58 self.extensionless_filepath = self.path[:-extension_size] 

59 # Set internal values 

60 self._cksum = None 

61 

62 # We must display the cksum here 

63 # Note that this is critical for the task args cksum when we handle lists of files 

64 # e.g. input_trajectory_files in process_input_files 

65 def __repr__ (self) -> str: 

66 if not self.filename: 

67 return '< No file >' 

68 return f'< File {self.cksum} >' 

69 

70 def __str__ (self) -> str: 

71 return self.__repr__() 

72 

73 def __hash__ (self) -> str: 

74 return hash(self.path) # Path is already normalized 

75 

76 def __bool__ (self) -> bool: 

77 return bool(self.filename) 

78 

79 def __eq__ (self, other : 'File') -> bool: 

80 if isinstance(other, self.__class__): 

81 return self.path == other.path # Paths are already normalized 

82 return False 

83 

84 def check_existence (self) -> bool: 

85 """Check if file exists.""" 

86 return exists(self.path) 

87 exists = property(check_existence, None, None, "Does the file exists? (read only)") 

88 

89 def get_format (self) -> Optional[str]: 

90 """Get file format based on the extension. 

91 If the extension is not recognized then raise an error.""" 

92 if not self.extension: 

93 return None 

94 extension_format = EXTENSION_FORMATS.get(self.extension, None) 

95 if not extension_format: 

96 raise InputError(f'Not recognized format extension "{self.extension}" from file "{self.filename}"') 

97 return extension_format 

98 format = property(get_format, None, None, "File standard format (read only)") 

99 

100 def get_mtime (self) -> str: 

101 """Get the file last modification time.""" 

102 raw_mtime = getmtime(self.path) 

103 return strftime(DATE_STYLE, gmtime(raw_mtime)) 

104 mtime = property(get_mtime, None, None, "File last modification date (read only)") 

105 

106 def get_size (self) -> str: 

107 """Get the file size in bytes.""" 

108 return getsize(self.path) 

109 size = property(get_size, None, None, "File size in bytes (read only)") 

110 

111 # DANI: This is provisional and it is not yet based in a cksum neither the file content 

112 def get_cksum (self) -> str: 

113 """Get a cksum code used to compare identical file content.""" 

114 # If we already have an internal value then use it 

115 if self._cksum != None: return self._cksum 

116 # Calculate it otherwise 

117 if not self.exists: self._cksum = f'missing {self.path}' 

118 else: self._cksum = f'{self.path} -> {self.mtime} {self.size}' 

119 return self._cksum 

120 cksum = property(get_cksum, None, None, "Cksum code used to compare identical file content (read only)") 

121 

122 # Set a couple of additional functions according to pytraj format requirements 

123 def is_pytraj_supported (self) -> bool: 

124 return self.format in PYTRAJ_SUPPORTED_FORMATS 

125 def get_pytraj_parm_format (self) -> Optional[str]: 

126 return PYTRAJ_PARM_FORMAT.get(self.format, None) 

127 

128 def remove (self): 

129 """Remove the file.""" 

130 remove(self.path) 

131 

132 def get_standard_file (self) -> 'File': 

133 """Given a file who has non-standard extension of a supported format we set a symlink with the standard extension.""" 

134 # If current file already has the extension then there is nothing to return 

135 if self.extension == self.format: 

136 return self 

137 return self.reformat(self.format) 

138 

139 def reformat (self, new_extension : str) -> 'File': 

140 """Given a file and a new extension we set a symlink from a new file with that extension.""" 

141 # Set the filename with the standard extension and initiate the file 

142 reformatted_filename = f'{self.extensionless_filepath}.{new_extension}' 

143 reformatted_file = File(reformatted_filename) 

144 # If standard file does not exist then set a symlink 

145 if not reformatted_file.exists: 

146 reformatted_file.set_symlink_to(self) 

147 return reformatted_file 

148 

149 def get_prefixed_file (self, prefix : str) -> 'File': 

150 """Get a prefixed file using this file name as the name base.""" 

151 return File(f'{self.basepath}/{prefix}{self.filename}') 

152 

153 def get_neighbour_file (self, filename : str) -> 'File': 

154 """Get a file in the same path but with a different name.""" 

155 return File(f'{self.basepath}/{filename}') 

156 

157 def get_symlink (self) -> Optional['File']: 

158 """Get the symlink target of this file.""" 

159 target_filepath = readlink(self.path) 

160 if not target_filepath: 

161 return None 

162 return File(self.basepath + '/' + target_filepath) 

163 

164 def set_symlink_to (self, other_file : 'File'): 

165 """Set this file a symlink to another file.""" 

166 # Check if symlinks are allowed 

167 no_symlinks = GLOBALS['no_symlinks'] 

168 # If symlinks are now allowed then copy the file instead 

169 if no_symlinks: 

170 other_file.copy_to(self) 

171 return 

172 # Self file must not exist 

173 if self.exists: 

174 raise Exception('Cannot set a symlink from an already existing file: ' + str(self)) 

175 # Note that symlink path must be relative to this file 

176 relative_path = relpath(other_file.path, self.basepath) 

177 # Set the symlink 

178 symlink(relative_path, self.path) 

179 

180 def is_symlink (self) -> bool: 

181 """Check if a file is already a symlink.""" 

182 return islink(self.path) 

183 

184 def copy_to (self, other_file : 'File'): 

185 """Copy a file to another.""" 

186 copyfile(self.path, other_file.path) 

187 

188 def rename_to (self, other_file : 'File'): 

189 """Rename a file to another.""" 

190 rename(self.path, other_file.path)