Coverage for mddb_workflow / utils / database.py: 66%
180 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 18:45 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 18:45 +0000
1import urllib.request
2import urllib.error
3import ssl
4import json
5from tqdm import tqdm
6from mddb_workflow.utils.auxiliar import load_json, save_json, InputError, RemoteServiceError, warn
7from mddb_workflow.utils.constants import INCOMPLETE_PREFIX
8from mddb_workflow.utils.type_hints import *
10# When downloading files, set the chunk size in bytes
11CHUNK_SIZE = 1024 * 1024 # 1 MB
13# Create a system to skip SSL certificates authentication
14NO_SSL_CONTEXT = ssl.create_default_context()
15NO_SSL_CONTEXT.check_hostname = False
16NO_SSL_CONTEXT.verify_mode = ssl.CERT_NONE
18class Remote:
19 def __init__(self, database : 'Database', accession : str, context = None):
20 # Set the URL
21 self.database = database
22 self.project_url = f'{self.database.url}rest/current/projects/{accession}'
23 # Set the context
24 self.context = context
25 # Set internal variables
26 self._project_data = None
27 self._available_files = None
28 # Download project data to make sure we have database access and the project exists
29 self.get_project_data()
31 # Get project data
32 # This is only used to make sure the project exists by now
33 def get_project_data (self) -> dict:
34 # Return the internal value if we already have it
35 if self._project_data != None:
36 return self._project_data
37 # Make sure the database is alive (and thus the provided database URL is valid)
38 if not self.database.is_alive():
39 raise RemoteServiceError('Database not available')
40 # Otherwise request the project data to the API
41 try:
42 response = urllib.request.urlopen(self.project_url, context=self.context)
43 self._project_data = json.loads(response.read())
44 return self._project_data
45 except urllib.error.HTTPError as error:
46 # Try to provide comprehensive error logs depending on the error
47 # If project was not found
48 if error.code == 404:
49 raise InputError(f'Remote project "{self.accession}" not found in {self.project_url}')
50 # If we don't know the error then simply say something went wrong
51 raise Exception(f'Error when downloading project data: {self.project_url} with error: {error}')
52 except urllib.error.URLError as error:
53 # If we don't know the error then simply say something went wrong
54 raise Exception(f'Error when downloading project data: {self.project_url} with error: {error}')
55 except:
56 raise Exception(f'Something went wrong when requesting project data: {self.project_url}')
57 project_data = property(get_project_data, None, None, "Project data (read only)")
59 # Number of snapshots in the remote trajectory
60 def get_snaphsots (self):
61 return self._project_data['metadata']['mdFrames']
62 snapshots = property(get_snaphsots, None, None, "Number of snapshots in the remote trajectory (read only)")
64 # Get available files in the remove project
65 def get_available_files (self):
66 # Return the internal value if we already have it
67 if self._available_files != None:
68 return self._available_files
69 # Otherwise request the available files to the API
70 request_url = self.project_url + '/files'
71 try:
72 response = urllib.request.urlopen(request_url, context=self.context)
73 self._available_files = json.loads(response.read())
74 except:
75 raise Exception(f'Something went wrong when requesting available files: {request_url}')
76 return self._available_files
77 available_files = property(get_available_files, None, None, "Remote available files (read only)")
79 def download_file (self, target_filename : str, output_file : 'File'):
80 """Download a specific file from the project/files endpoint."""
81 request_url = f'{self.project_url}/files/{target_filename}'
82 print(f'Downloading file "{target_filename}" in {output_file.path}\n')
83 try:
84 response = urllib.request.urlopen(request_url, context=self.context)
85 with open(output_file.path, 'wb') as file:
86 while True:
87 chunk = response.read(CHUNK_SIZE)
88 if not chunk: break
89 file.write(chunk)
90 except urllib.error.HTTPError as error:
91 if error.code == 404:
92 raise Exception(f'Missing remote file "{target_filename}"')
93 # If we don't know the error then simply say something went wrong
94 raise Exception(f'Something went wrong when downloading file "{target_filename}" in {request_url}')
96 # Download the project standard topology
97 def download_standard_topology (self, output_file : 'File'):
98 request_url = self.project_url + '/topology'
99 print(f'Downloading standard topology ({output_file.path})\n')
100 try:
101 response = urllib.request.urlopen(request_url, context=self.context)
102 with open(output_file.path, 'wb') as file:
103 file.write(response.read())
104 except Exception as error:
105 raise Exception(f'Something went wrong when downloading the standard topology: {request_url} with error: {error}')
107 # Download the standard structure
108 def download_standard_structure (self, output_file : 'File'):
109 request_url = self.project_url + '/structure'
110 print(f'Downloading standard structure ({output_file.path})\n')
111 try:
112 response = urllib.request.urlopen(request_url, context=self.context)
113 with open(output_file.path, 'wb') as file:
114 file.write(response.read())
115 except Exception as error:
116 raise Exception(f'Something went wrong when downloading the standard structure: {request_url} with error: {error}')
118 # Download the main trajectory
119 def download_trajectory (self,
120 output_file : 'File',
121 frame_selection : Optional[str] = None,
122 atom_selection : Optional[str] = None,
123 format : Optional[str] = None
124 ):
125 if [frame_selection, atom_selection, format] == [None,None,'xtc']:
126 # If we dont have a specific request, we can download the main trajectory
127 # directly from the trajectory.xtc file so it is faster
128 request_url = f'{self.project_url}/files/trajectory.xtc'
129 else:
130 # Set the base URL
131 request_url = self.project_url + '/trajectory'
132 # Additional arguments to be included in the URL
133 arguments = []
134 if frame_selection:
135 arguments.append(f'frames={frame_selection}')
136 if atom_selection:
137 arguments.append(f'atoms={atom_selection}')
138 if format:
139 arguments.append(f'format={format}')
140 if len(arguments) > 0:
141 request_url += '?' + '&'.join(arguments)
142 # Send the request
143 print(f'Downloading main trajectory ({output_file.path})')
144 # Create a temporal file to download the trajectory
145 # Thus if the download is interrupted we will know the trajectory is incomplete
146 incomplete_trajectory = output_file.get_prefixed_file(INCOMPLETE_PREFIX)
147 # If we have a previous incomplete trajectory then remove it
148 if incomplete_trajectory.exists: incomplete_trajectory.remove()
149 try:
150 response = urllib.request.urlopen(request_url, context=self.context)
151 pbar = tqdm(unit = 'B', unit_scale = True, unit_divisor = 1024,
152 miniters = 1, desc = ' Progress', leave=False)
153 with open(incomplete_trajectory.path, 'wb') as file:
154 while True:
155 chunk = response.read(CHUNK_SIZE)
156 if chunk: pbar.update(len(chunk))
157 else: break
158 file.write(chunk)
159 except Exception as error:
160 raise Exception(f'Something went wrong when downloading the main trajectory: {request_url} with error: {error}')
161 # Once the trajectory is fully downloaded we change its filename
162 incomplete_trajectory.rename_to(output_file)
164 # Download the inputs file
165 def download_inputs_file (self, output_file : 'File'):
166 request_url = self.project_url + '/inputs'
167 # In case this is a json file we must specify the format in the query
168 is_json = output_file.format == 'json'
169 if is_json:
170 request_url += '?format=json'
171 # Send the request
172 print(f'Downloading inputs file ({output_file.path})\n')
173 try:
174 response = urllib.request.urlopen(request_url, context=self.context)
175 with open(output_file.path, 'wb') as file:
176 file.write(response.read())
177 except:
178 raise Exception(f'Something went wrong when downloading the inputs file: {request_url}')
179 # If this is a json file then rewrite the inputs file in a pretty formatted way (with indentation)
180 if is_json:
181 file_content = load_json(output_file.path)
182 save_json(file_content, output_file.path, indent = 4)
184 # Get analysis data
185 def download_analysis_data(self, analysis_type: str, output_file: 'File'):
186 request_url = f'{self.project_url}/analyses/{analysis_type}'
187 print(f'Downloading {analysis_type} analysis data\n')
188 try:
189 response = urllib.request.urlopen(request_url, context=self.context)
190 with open(output_file.path, 'wb') as file:
191 file.write(response.read())
192 # Format JSON if needed
193 file_content = load_json(output_file.path)
194 save_json(file_content, output_file.path, indent=4)
195 except Exception as error:
196 raise Exception(f'Something went wrong when retrieving {analysis_type} analysis: {request_url} with error: {error}')
198class Database:
199 def __init__(self, url: str, no_ssl_authentication : bool = False):
200 self.url = url
201 # If the URL already includes /rest/... then clean this part away
202 if '/rest' in self.url:
203 self.url = self.url.split('/rest')[0] + '/'
204 # Set the context
205 self.context = NO_SSL_CONTEXT if no_ssl_authentication else None
207 def __str__ (self) -> str:
208 return f'< Database {self.url} >'
210 # Check if the database is alive
211 # WARNING: Note that this function requires internet connection
212 # WARNING: Do not run in by default
213 def is_alive (self) -> bool:
214 try:
215 response = urllib.request.urlopen(self.url, context=self.context)
216 response.read(1)
217 return True
218 except urllib.error.HTTPError as error:
219 # Server error
220 if error.code == 503:
221 warn('MDDB Service unavailable. Please try again later.')
222 return False
223 # Unknown HTTP error
224 return False
225 except urllib.error.URLError as error:
226 # SSL error
227 if 'SSL: CERTIFICATE_VERIFY_FAILED' in str(error):
228 raise RemoteServiceError(f'Failed to verify SSL certificate from {self.url}\n' + \
229 ' Use the "--ssleep" flag to avoid SSL authentication if you trust the source.')
230 # Timeout error
231 # The error variable as is do not work properly
232 # Its 'errno' value is None (at least for tiemout errors)
233 actual_error = error.args[0]
234 if actual_error.errno == 110:
235 warn('Timeout error when requesting MDposit. Is the node fallen?')
236 return False
237 # Unknown URL error
238 return False
239 except:
240 # Unknown error
241 return False
243 # Instantiate the remote project handler
244 def get_remote_project (self, accession : str) -> Remote:
245 return Remote(self, accession, context=self.context)
247 # Check if the required sequence is already in the MDDB database
248 def get_reference_data (self, reference : str, id : str) -> Optional[dict]:
249 # Make sure the database is alive (and thus the provided database URL is valid)
250 # If not then we return None and allow the workflow to keep going
251 # Probably if the reference can not be obtained the workflow will generate it again
252 if not self.is_alive(): return None
253 # Request the specific data
254 request_url = f'{self.url}rest/v1/references/{reference}/{id}'
255 try:
256 with urllib.request.urlopen(request_url, context=self.context) as response:
257 return json.loads(response.read().decode("utf-8", errors='ignore'))
258 # Handle possible errors
259 except urllib.error.HTTPError as error:
260 # If the reference is not found in MDposit
261 if error.code == 404: return None
262 warn(f'Error when requesting MDposit: {request_url}')
263 raise RuntimeError(f'Something went wrong with the MDposit request {request_url}')
264 except:
265 raise RuntimeError(f'Something went wrong with the MDposit request {request_url}')