Coverage for mddb_workflow/tools/generate_map.py: 65%
515 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 15:48 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 15:48 +0000
1import sys
2import json
3import urllib.request
4import numpy as np
5import re
7from mddb_workflow.utils.auxiliar import protein_residue_name_to_letter, NoReferableException
8from mddb_workflow.utils.auxiliar import InputError, warn, load_json, save_json, request_pdb_data
9from mddb_workflow.utils.constants import REFERENCE_SEQUENCE_FLAG, NO_REFERABLE_FLAG, NOT_FOUND_FLAG
10from mddb_workflow.utils.file import File
11from mddb_workflow.utils.type_hints import *
13import xmltodict
15from Bio import Align
16from Bio.Blast import NCBIWWW
17from Bio.Align import substitution_matrices, Alignment
19# Set generic sequences which should be similar to known antibodies
20REFERENCE_ANTIBODY_SEQUENCES = {
21 # Single domain antibody
22 # This sequence has been designed with the aid of DeepSeek
23 # CDR regionds have been replaced by X
24 'QVQLVESGGGLVQPGGSLRLSCAASXXXXXXXWYRQAPGKEREFVAXXXXXXRFTISRDNAKNTVYLQMNSLKPEDTAVYYCXXXXXXXXXXWGQGTQVTVSS'
25}
28# Save current stderr to further restore it
29stderr_backup = sys.stderr
30# Suppress stderr
31#sys.stderr = None
32# Import the function
33#from Bio.SubsMat import MatrixInfo
34# Restore stderr
35#sys.stderr = stderr_backup
37# AGUS: Necesitamos que la secuencia de aa alineada tenga gaps identificados con guiones y pairwaise2 (biopython < 1.80) no lo hace
38# AGUS: Para actualizar a biopython >= 1.80 pairwaise2 ya no existe y no podemos obtener el mismo outoput que necesitábamos
39# AGUS: Esta función parece resolver el problema: https://github.com/biopython/biopython/issues/4769
40# Set the aligner
41aligner = Align.PairwiseAligner(mode='local')
42aligner.substitution_matrix = substitution_matrices.load("BLOSUM62")
43aligner.open_gap_score = -10
44aligner.extend_gap_score = -0.5
45def add_leading_and_trailing_gaps(alignment: Alignment) -> Alignment:
46 coords = alignment.coordinates
48 # We need to add two columns on each side of the coordinates matrix.
49 # The first column will always point to the start of the sequences,
50 # hence filled with zeros.
51 new_coords = np.zeros((2, coords.shape[1] + 4), dtype=int)
53 # The last w column will always point to the end of sequences,
54 # hence filled with the length of the respective sequences.
55 target_len = len(alignment.sequences[0])
56 query_len = len(alignment.sequences[1])
57 last_col = np.array([target_len, query_len])
58 new_coords[:, -1] = last_col
60 # The middle columns will contain the original coordinates.
61 new_coords[:, 2:-2] = coords
63 # The second and one before last columns will point to the end of sequence,
64 # with less unaligned positions.
65 new_coords[:, 1] = coords[:, 0] - coords[:, 0].min()
66 new_coords[:, -2] = coords[:, -1] + (last_col - coords[:, -1]).min()
67 return Alignment(sequences=alignment.sequences, coordinates=new_coords)
69# Map the structure aminoacids sequences against the standard reference sequences
70# References are uniprot accession ids and they are optional
71# For each reference, align the reference sequence with the topology sequence
72# Chains which do not match any reference sequence will be blasted
73# Note that an internet connection is required both to retireve the uniprot reference sequence and to do the blast
74# NEVER FORGET: This system relies on the fact that topology chains are not repeated
75def generate_protein_mapping (
76 structure : 'Structure',
77 output_filepath : str,
78 database_url : str,
79 cache : 'Cache',
80 register : dict,
81 mercy : list[str] = [],
82 input_protein_references : list | dict = [],
83 pdb_ids : list[str] = [],
84) -> dict:
85 """Map the structure aminoacids sequences against the Uniprot reference sequences."""
86 # Set the output file
87 protein_references_file = File(output_filepath)
89 # Remove previous warnings, if any
90 register.remove_warnings(REFERENCE_SEQUENCE_FLAG)
91 # Forced references must be list or dict
92 # If it is none then we set it as an empty list
93 if input_protein_references == None:
94 input_protein_references = []
95 # If forced references is a list of dictionaries then it means the input is wrongly formatted
96 # This may happen since the inputs file is in YAML format, and a simple hyphen makes the difference
97 # We can fix it from here anyway
98 if type(input_protein_references) == list and len(input_protein_references) > 0 and type(input_protein_references[0]) == dict:
99 input_protein_references = { k: v for fr in input_protein_references for k, v in fr.items() }
100 # Check if the forced references are strict (i.e. reference per chain, as a dictionary) or flexible (list of references)
101 strict_references = type(input_protein_references) == dict
102 # Check the "no referable" flag not to be passed when references are not strict
103 if not strict_references and NO_REFERABLE_FLAG in input_protein_references:
104 raise SystemExit('WRONG INPUT: The "no referable" flag cannot be passed in a list.' \
105 ' You must use a chain keys dictionary (e.g. {"A":"' + NO_REFERABLE_FLAG + '"})')
106 # Store all the references which are got through this process
107 # Note that not all references may be used at the end
108 references = {}
109 # Given a uniprot accession, get the reference object
110 # Try first asking to the MDposit database in case the reference exists already
111 # If not, retrieve UniProt data and build the reference object
112 # Return also a boolean to set if the reference already existed (True) or not (False)
113 def get_reference (uniprot_accession : str) -> tuple[dict, bool]:
114 reference = references.get(uniprot_accession, None)
115 if reference:
116 return reference, True
117 reference = get_mdposit_reference(uniprot_accession, database_url)
118 if reference:
119 return reference, True
120 reference = get_uniprot_reference(uniprot_accession)
121 return reference, False
122 # Import local references, in case the references json file already exists
123 imported_references = None
124 if protein_references_file.exists:
125 imported_references = import_references(protein_references_file)
126 # Append the imported references to the overall references pool
127 for k,v in imported_references.items():
128 references[k] = v
129 # Get the structure chain sequences
130 parsed_chains = get_parsed_chains(structure)
131 # Find out which chains are protein
132 protein_parsed_chains = []
133 for chain_data in parsed_chains:
134 sequence = chain_data['sequence']
135 if next((letter for letter in sequence if letter != 'X'), None):
136 chain_data['match'] = { 'ref': None, 'map': None, 'score': 0 }
137 protein_parsed_chains.append(chain_data)
138 # If there are no protein sequences then there is no need to map anything
139 if len(protein_parsed_chains) == 0:
140 print(' There are no protein sequences')
141 return protein_parsed_chains
142 # For each input forced reference, get the reference sequence
143 reference_sequences = {}
144 # Save already tried alignments to not repeat the alignment further
145 tried_alignments = { chain_data['name']: [] for chain_data in protein_parsed_chains }
146 # Set a function to try to match all protein sequences with the available reference sequences
147 # In case of match, objects in the 'protein_parsed_chains' list are modified by adding the result
148 # Finally, return True if all protein sequences were matched with the available reference sequences or False if not
149 def match_sequences () -> bool:
150 # Track each chain-reference alignment match and keep the score of successful alignments
151 # Now for each structure sequence, align all reference sequences and keep the best alignment (if it meets the minimum)
152 for chain_data in protein_parsed_chains:
153 chain = chain_data['name']
154 chain_tried_alignments = tried_alignments[chain]
155 # In case references are forced per chain check if there is a reference for this chain and match according to this
156 if strict_references:
157 # Get the forced specific chain for this sequence, if any
158 forced_reference = input_protein_references.get(chain, None)
159 if forced_reference:
160 # If the chain has a specific forced reference then we must align it just once
161 # Skip this process in further matches
162 if chain_data['match']['ref']:
163 continue
164 # In case the forced reference is the "no referable" flag
165 # Thus it has no reference sequence and we must not try to match it
166 # Actually, any match would be accidental and not correct
167 if forced_reference == NO_REFERABLE_FLAG:
168 chain_data['match'] = { 'ref': NO_REFERABLE_FLAG }
169 continue
170 # In case the forced reference is the "not found" flag
171 # This should not happend but we may be using references as forced references, so just in case
172 if forced_reference == NOT_FOUND_FLAG:
173 chain_data['match'] = { 'ref': NOT_FOUND_FLAG }
174 continue
175 # Get the forced reference sequence and align it to the chain sequence in order to build the map
176 reference_sequence = reference_sequences[forced_reference]
177 print(f' Aligning chain {chain} with {forced_reference} reference sequence')
178 align_results = align(reference_sequence, chain_data['sequence'])
179 # The align must match or we stop here and warn the user
180 if not align_results:
181 raise SystemExit(f'Forced reference {chain} -> {forced_reference} does not match in sequence')
182 sequence_map, align_score = align_results
183 reference = references[forced_reference]
184 chain_data['match'] = { 'ref': reference, 'map': sequence_map, 'score': align_score }
185 continue
186 # If the chain has already a match and this match is among the forced references then stop here
187 # Forced references have priority and this avoids having a match with a not forced reference further
188 # Same behaviour if the match is with an unreferable sequence
189 if chain_data['match']['ref'] and ( chain_data['match']['ref'] == NO_REFERABLE_FLAG
190 or chain_data['match']['ref'] == NOT_FOUND_FLAG
191 or chain_data['match']['ref']['uniprot'] in input_protein_references):
192 continue
193 # Iterate over the different available reference sequences
194 for uniprot_id, reference_sequence in reference_sequences.items():
195 # If this alignment has been tried already then skip it
196 if uniprot_id in chain_tried_alignments:
197 continue
198 # Align the structure sequence with the reference sequence
199 # NEVER FORGET: This system relies on the fact that topology chains are not repeated
200 print(f' Aligning chain {chain} with {uniprot_id} reference sequence')
201 align_results = align(reference_sequence, chain_data['sequence'])
202 tried_alignments[chain].append(uniprot_id) # Save the alignment try, no matter if it works or not
203 if not align_results:
204 continue
205 # In case we have a valid alignment, check the alignment score is better than the current reference score (if any)
206 sequence_map, align_score = align_results
207 current_reference = chain_data['match']
208 if current_reference['score'] > align_score:
209 continue
210 # If the match is a 'no referable' exception then set a no referable flag
211 if type(uniprot_id) == NoReferableException:
212 chain_data['match'] = { 'ref': NO_REFERABLE_FLAG }
213 continue
214 # Proceed to set the corresponding reference toherwise
215 reference = references[uniprot_id]
216 # If the alignment is better then we impose the new reference
217 chain_data['match'] = { 'ref': reference, 'map': sequence_map, 'score': align_score }
218 # Sum up the current matching
219 print(' Protein reference summary:')
220 for chain_data in parsed_chains:
221 name = chain_data['name']
222 match = chain_data.get('match', None)
223 if not match:
224 print(f' {name} -> Not protein')
225 continue
226 reference = chain_data['match'].get('ref', None)
227 if not reference:
228 print(f' {name} -> ¿?')
229 continue
230 if reference == NO_REFERABLE_FLAG:
231 print(f' {name} -> No referable')
232 continue
233 if reference == NOT_FOUND_FLAG:
234 print(f' {name} -> Not found')
235 continue
236 uniprot_id = reference['uniprot']
237 print(f' {name} -> {uniprot_id}')
238 # Export already matched references
239 export_references(protein_parsed_chains, protein_references_file)
240 # Finally, return True if all protein sequences were matched with the available reference sequences or False if not
241 allright = all([ chain_data['match']['ref'] for chain_data in protein_parsed_chains ])
242 # If we match all chains then make sure there is no forced reference missing which did not match
243 # Otherwise stop here and force the user to remove these forced uniprot ids from the inputs file
244 # WARNING: Although we could move on it is better to stop here and warn the user to prevent a future silent problem
245 if allright and input_protein_references:
246 # Get forced uniprot ids
247 forced_uniprot_ids = set(list(input_protein_references.values()) if strict_references else input_protein_references)
248 forced_uniprot_ids -= { NOT_FOUND_FLAG, NO_REFERABLE_FLAG }
249 #forced_uniprot_ids.remove(NO_REFERABLE_FLAG)
250 #forced_uniprot_ids.remove(NOT_FOUND_FLAG)
251 # Get matched uniprot ids
252 matched_references = [ chain_data['match']['ref'] for chain_data in protein_parsed_chains ]
253 matched_uniprot_ids = set([ ref['uniprot'] for ref in matched_references if type(ref) == dict ])
254 # Check the difference
255 unmatched_uniprot_ids = forced_uniprot_ids - matched_uniprot_ids
256 if len(unmatched_uniprot_ids) > 0:
257 log = ', '.join(unmatched_uniprot_ids)
258 raise InputError(f'Some forced references were not matched with any protein sequence: {log}\n'
259 ' Please remove them from the inputs file')
260 return allright
261 # --- End of match_sequences function --------------------------------------------------------------------------------
262 # First use the forced references for the matching
263 if input_protein_references:
264 forced_uniprot_ids = list(input_protein_references.values()) if strict_references else input_protein_references
265 for uniprot_id in forced_uniprot_ids:
266 # If instead of a uniprot id there is a 'no referable' flag then we skip this process
267 if uniprot_id == NO_REFERABLE_FLAG:
268 continue
269 # If instead of a uniprot id there is a 'not found' flag then we skip this process
270 # This should not happend but we may be using references as forced references, so just in case
271 if uniprot_id == NOT_FOUND_FLAG:
272 continue
273 # If reference is already in the list (i.e. it has been imported) then skip this process
274 reference = references.get(uniprot_id, None)
275 if reference:
276 reference_sequences[uniprot_id] = reference['sequence']
277 continue
278 # Find the reference data for the given uniprot id
279 reference, already_loaded = get_reference(uniprot_id)
280 reference_sequences[uniprot_id] = reference['sequence']
281 # Save the current whole reference object for later
282 references[reference['uniprot']] = reference
283 # Now that we have all forced references data perform the matching
284 # If we have every protein chain matched with a reference then we stop here
285 print(' Using only forced references from the inputs file')
286 if match_sequences():
287 return protein_parsed_chains
288 # Now add the imported references to reference sequences. Thus now they will be 'matchable'
289 # Thus now they will be 'matchable', so try to match sequences again in case any of the imported references has not been tried
290 # It was not done before since we want forced references to have priority
291 if imported_references:
292 need_rematch = False
293 for uniprot_id, reference in imported_references.items():
294 # If the imported reference has been aligned already (i.e. it was a forced reference)
295 if uniprot_id in reference_sequences:
296 continue
297 # Otherwise, include it
298 need_rematch = True
299 reference_sequences[uniprot_id] = reference['sequence']
300 # If there was at least one imported reference missing then rerun the matching
301 if need_rematch:
302 print(' Using also references imported from references.json')
303 if match_sequences():
304 return protein_parsed_chains
305 # If there are still any chain which is not matched with a reference then we need more references
306 # To get them, retrieve all uniprot ids associated to the pdb ids, if any
307 if pdb_ids and len(pdb_ids) > 0:
308 for pdb_id in pdb_ids:
309 # Ask PDB
310 uniprot_ids = pdb_to_uniprot(pdb_id)
311 for uniprot_id in uniprot_ids:
312 # If this is not an actual UniProt, but a no referable exception, then handle it
313 # We must find the matching sequence and set the corresponding chain as no referable
314 if type(uniprot_id) == NoReferableException:
315 reference_sequences[uniprot_id] = uniprot_id.sequence
316 continue
317 # Build a new reference from the resulting uniprot
318 reference, already_loaded = get_reference(uniprot_id)
319 if reference == None:
320 continue
321 reference_sequences[reference['uniprot']] = reference['sequence']
322 # Save the current whole reference object for later
323 references[reference['uniprot']] = reference
324 # If we have every protein chain matched with a reference already then we stop here
325 print(' Using also references related to PDB ids from the inputs file')
326 if match_sequences():
327 return protein_parsed_chains
328 # If there are still any chain which is not matched with a reference then we need more references
329 # To get them, we run a blast with each orphan chain sequence
330 for chain_data in protein_parsed_chains:
331 # Skip already references chains
332 if chain_data['match']['ref']:
333 continue
334 # Run the blast
335 sequence = chain_data['sequence']
336 uniprot_id = blast(sequence, cache)
337 if not uniprot_id:
338 chain_data['match'] = { 'ref': NOT_FOUND_FLAG }
339 continue
340 # Build a new reference from the resulting uniprot
341 reference, already_loaded = get_reference(uniprot_id)
342 reference_sequences[reference['uniprot']] = reference['sequence']
343 # Save the current whole reference object for later
344 references[reference['uniprot']] = reference
345 # If we have every protein chain matched with a reference already then we stop here
346 print(' Using also references from blast')
347 if match_sequences():
348 return protein_parsed_chains
349 # At this point we should have macthed all sequences
350 # If not, kill the process unless mercy was given
351 must_be_killed = REFERENCE_SEQUENCE_FLAG not in mercy
352 if must_be_killed:
353 raise SystemExit('BLAST failed to find a matching reference sequence for at least one protein sequence')
354 warn('BLAST failed to find a matching reference sequence for at least one protein sequence')
355 register.add_warning(REFERENCE_SEQUENCE_FLAG, 'There is at least one protein region which is not mapped to any reference sequence')
356 return protein_parsed_chains
358# Export reference objects data to a json file
359# This file is used by the loader to load new references to the database
360# Note that all references are saved to this json file, even those which are already in the database
361# It is the loader who is the responsible to check which references must be loaded and which ones are loaded already
362# Note that mapping data (i.e. which residue belongs to each reference) is not saved
363def export_references (mapping_data : list, protein_references_file : 'File'):
364 final_references = []
365 final_uniprots = []
366 for data in mapping_data:
367 match = data['match']
368 ref = match['ref']
369 if not ref or ref == NO_REFERABLE_FLAG or ref == NOT_FOUND_FLAG:
370 continue
371 uniprot = ref['uniprot']
372 if uniprot in final_uniprots:
373 continue
374 final_references.append(ref)
375 final_uniprots.append(uniprot)
376 # If there are no references to exported then do not egenrate the json file
377 if len(final_references) == 0:
378 return
379 # Write references to a json file
380 save_json(final_references, protein_references_file.path, indent = 4)
382# Import reference json file so we do not have to rerun this process
383def import_references (protein_references_file : 'File') -> list:
384 print(' Importing references from ' + protein_references_file.path)
385 # Read the file
386 file_content = load_json(protein_references_file.path)
387 # Format data as the process expects to find it
388 references = {}
389 for reference in file_content:
390 uniprot = reference['uniprot']
391 references[uniprot] = reference
392 return references
394# Get each chain name and aminoacids sequence in a topology
395# Output format example: [ { 'sequence': 'VNLTT', 'indices': [1, 2, 3, 4, 5] }, ... ]
396def get_parsed_chains (structure : 'Structure') -> list:
397 parsed_chains = []
398 chains = structure.chains
399 for chain in chains:
400 name = chain.name
401 sequence = ''
402 residue_indices = []
403 for residue in chain.residues:
404 letter = protein_residue_name_to_letter(residue.name)
405 sequence += letter
406 residue_indices.append(residue.index)
407 sequence_object = { 'name': name, 'sequence': sequence, 'residue_indices': residue_indices }
408 parsed_chains.append(sequence_object)
409 return parsed_chains
411# Align two aminoacid sequences
412# Return a list with the reference residue indexes (values)
413# which match each new sequence residues indexes (indexes)
414# Return also the score of the alignment
415# Return None when there is not valid alignment at all
416# Set verbose = True to see a visual summary of the sequence alignments in the logs
417def align (ref_sequence : str, new_sequence : str, verbose : bool = False) -> Optional[ tuple[list, float] ]:
419 #print('- REFERENCE\n' + ref_sequence + '\n- NEW\n' + new_sequence)
421 # If the new sequence is all 'X' stop here, since this would make the alignment infinite
422 # Then an array filled with None is returned
423 if all([ letter == 'X' for letter in new_sequence ]):
424 return None
426 # Return the new sequence as best aligned as possible with the reference sequence
427 #alignments = pairwise2.align.localds(ref_sequence, new_sequence, MatrixInfo.blosum62, -10, -0.5)
428 # DANI: Habría que hacerlo de esta otra forma según el deprecation warning (arriba hay más código)
429 # DANI: El problema es que el output lo tiene todo menos la sequencia en formato alienada
430 # DANI: i.e. formato '----VNLTT' (con los gaps identificados con guiones), que es justo el que necesito
431 # Actualized code for biopython >= 1.80
432 alignments = aligner.align(ref_sequence, new_sequence)
434 # In case there are no alignments it means the current chain has nothing to do with this reference
435 # Then an array filled with None is returned
436 if len(alignments) == 0:
437 return None
439 # Several alignments may be returned, specially when it is a difficult or impossible alignment
440 # Output format example: '----VNLTT'
441 best_alignment = alignments[0]
442 best_alignment_with_gaps = add_leading_and_trailing_gaps(best_alignment)
443 # Extract the allignment sequences
444 #aligned_ref, aligned_sequence = best_alignment_with_gaps.sequences
445 aligned_ref = best_alignment_with_gaps[0] # AGUS: Puede ser útil en el futuro
446 aligned_sequence = best_alignment_with_gaps[1]
447 # Obtain the score of the alignment
448 score = best_alignment.score
449 # Calculate the normalized score
450 normalized_score = score / len(new_sequence)
452 # If the normalized score does not reaches the minimum we consider the alignment is not valid
453 # It may happen when the reference goes for a specific chain but we must map all chains
454 # This 1 has been found experimentally
455 # Non maching sequence may return a 0.1-0.3 normalized score
456 # Matching sequence may return >4 normalized score
457 if normalized_score < 2:
458 print(' Not valid alignment')
459 return None
461 # Tell the user about the success
462 beautiful_normalized_score = round(normalized_score * 100) / 100
463 print(f' Valid alignment -> Normalized score = {beautiful_normalized_score}')
465 # Match each residue
466 aligned_mapping = []
467 aligned_index = 0
468 for l, letter in enumerate(aligned_sequence):
469 # Guions are skipped
470 if letter == '-':
471 continue
472 # Get the current residue of the new sequence
473 equivalent_letter = new_sequence[aligned_index]
474 if not letter == equivalent_letter:
475 raise SystemExit(f'Something was wrong at position {l} :S')
476 # 'X' residues cannot be mapped since reference sequences should never have any 'X'
477 if letter == 'X':
478 aligned_mapping.append(None)
479 aligned_index += 1
480 continue
481 # Otherwise add the equivalent aligned index to the mapping
482 # WARNING: Add +1 since uniprot residue counts start at 1, not 0
483 aligned_mapping.append(l + 1)
484 aligned_index += 1
486 return aligned_mapping, normalized_score
488# Given an aminoacids sequence, return a list of uniprot ids
489# Note that we are blasting against UniProtKB / Swiss-Prot so results will always be valid UniProt accessions
490# WARNING: This always means results will correspond to curated entries only
491# If your sequence is from an exotic organism the result may be not from it but from other more studied organism
492# Since this function may take some time we always cache the result
493def blast (sequence : str, cache : Optional['Cache'] = None) -> Optional[str]:
494 # Check if we have the resulted saved in cache already
495 if cache != None:
496 cached_blasts = cache.retrieve('blasts', {})
497 if sequence in cached_blasts:
498 accession = cached_blasts[sequence]
499 print(f'Using previous blast result from cache: {accession}')
500 return accession
501 print('Throwing blast...')
502 result = NCBIWWW.qblast(
503 program = "blastp",
504 database = "swissprot", # UniProtKB / Swiss-Prot
505 sequence = sequence,
506 )
507 parsed_result = xmltodict.parse(result.read())
508 hits = parsed_result['BlastOutput']['BlastOutput_iterations']['Iteration']['Iteration_hits']
509 # When there is no result return None
510 # Note that this is possible although hardly unprobable
511 if not hits:
512 return None
513 # Get the first result only
514 result = hits['Hit'][0]
515 # Return the accession
516 # DANI: Si algun día tienes problemas porque te falta el '.1' al final del accession puedes sacarlo de Hit_id
517 accession = result['Hit_accession']
518 print('Result: ' + accession)
519 # Save the result in the cache
520 if cache != None:
521 cached_blasts[sequence] = accession
522 cached_blasts = cache.update('blasts', cached_blasts)
523 return accession
525# Given a uniprot accession, use the MDposit API to request its data in case it is already in the database
526# If the reference is not found or there is any error then return None and keep going, we do not really need this
527def get_mdposit_reference (uniprot_accession : str, database_url : str) -> Optional[dict]:
528 # Request MDposit
529 request_url = f'{database_url}rest/v1/references/{uniprot_accession}'
530 try:
531 with urllib.request.urlopen(request_url) as response:
532 parsed_response = json.loads(response.read().decode("utf-8"))
533 # If the accession is not found in the database then we stop here
534 except urllib.error.HTTPError as error:
535 # If the uniprot accession is not yet in the MDposit references then return None
536 if error.code == 404:
537 return None
538 else:
539 warn(f'Something went wrong with the MDposit request {request_url} (error {error.code})')
540 warn(f'Protein reference {uniprot_accession} will be made again even if it exists')
541 return None
542 # This error may occur if there is no internet connection
543 except urllib.error.URLError as error:
544 # The error variable as is do not work properly
545 # Its 'errno' value is None (at least for tiemout errors)
546 actual_error = error.args[0]
547 if actual_error.errno == 110:
548 print('Timeout error when requesting MDposit. Is the node fallen?')
549 warn('Something went wrong with the MDposit request ' + request_url)
550 warn(f'Protein reference {uniprot_accession} will be made again even if it exists')
551 return None
552 return parsed_response
554# Given a uniprot accession, use the uniprot API to request its data and then mine what is needed for the database
555def get_uniprot_reference (uniprot_accession : str) -> Optional[dict]:
556 # Request Uniprot
557 request_url = 'https://www.ebi.ac.uk/proteins/api/proteins/' + uniprot_accession
558 parsed_response = None
559 request = urllib.request.Request(request_url)
560 # One day the API was returning a 'Secure Connection Failed' error and this header fixed the problem
561 request.add_header('Referer', 'https://www.uniprot.org/')
562 try:
563 with urllib.request.urlopen(request) as response:
564 parsed_response = json.loads(response.read().decode("utf-8"))
565 # If the accession is not found in UniProt then the id is not valid
566 except urllib.error.HTTPError as error:
567 if error.code == 404:
568 warn('Cannot find UniProt entry for accession ' + uniprot_accession)
569 return None
570 print('Error when requesting ' + request_url)
571 raise RuntimeError(f'Something went wrong with the Uniprot request (error {error.code})')
572 except:
573 print('Error when requesting ' + request_url)
574 raise RuntimeError(f'Something went very wrong with the Uniprot request')
575 # If we have not a response at this point then it may mean we are trying to access an obsolete entry (e.g. P01607)
576 if parsed_response == None:
577 warn('Cannot find UniProt entry for accession ' + uniprot_accession)
578 return None
579 # Get the full protein name
580 protein_data = parsed_response['protein']
581 protein_name_data = protein_data.get('recommendedName', None)
582 # DANI: It is possible that the 'recommendedName' is missing if it is not a reviewed UniProt entry
583 if not protein_name_data:
584 warn(f'The UniProt accession {uniprot_accession} is missing the recommended name. You should consider changing the reference.')
585 protein_name_data = protein_data.get('submittedName', None)[0]
586 if not protein_name_data:
587 raise ValueError('Unexpected structure in UniProt response for accession ' + uniprot_accession)
588 protein_name = protein_name_data['fullName']['value']
589 # Get the gene names as a single string
590 gene_names = []
591 # WARNING: Some uniprot entries are missing gene names (e.g. P00718)
592 # WARNING: Some uniprot entries have names in non-canonical keys (e.g. orfNames, olnNames)
593 # These names are not shown event in the uniprot web page so we also skip them
594 genes = parsed_response.get('gene', [])
595 for gene in genes:
596 gene_name = gene.get('name', None)
597 if not gene_name:
598 continue
599 gene_names.append(gene_name['value'])
600 gene_names = ', '.join(gene_names) if len(gene_names) > 0 else None
601 # Get the organism name
602 organism = parsed_response['organism']['names'][0]['value']
603 # Get the aminoacids sequence
604 sequence = parsed_response['sequence']['sequence']
605 # Get interesting regions to be highlighted in the client
606 domains = []
607 # WARNING: Some uniprot entries are missing features (e.g. O27908)
608 features = parsed_response.get('features', [])
609 # Get comments data which may be useful to further build domain descriptions
610 comments_data = parsed_response.get('comments', None)
611 # There are many types of features but in this case we will focus on domanins and similar
612 target_types = ['CHAIN', 'REGION', 'DOMAIN', 'MOTIF', 'SITE']
613 for feature in features:
614 # Skip features of other types
615 if feature['type'] not in target_types:
616 continue
617 # Get the domain name
618 name = feature['description']
619 # Build the domain description from the coments data
620 description = None
621 if comments_data:
622 comments = [ comment for comment in parsed_response['comments'] if name == comment.get('molecule', None) ]
623 comment_text = []
624 for comment in comments:
625 if not comment.get('text', False): continue
626 text = comment.get('text', None)
627 if text == None: raise ValueError('Unexpected UniProt response format: no text in comment')
628 # DANI: el comment 'text' casi siempre es una lista
629 # DANI: solo tengo constancia de una vez en que era un string directamente
630 # DANI: en uno de los comentarios de https://www.ebi.ac.uk/proteins/api/proteins/Q15465
631 if type(text) == str: comment_text.append(text)
632 elif type(text) == list: comment_text.append(text[0]['value'])
633 else: raise ValueError('Unexpected UniProt response format: text in comment is neither str or list')
634 description = '\n\n'.join(comment_text)
635 # Set the domain selection
636 # The domain 'begin' and 'end' values may include non-numeric symbols such as '~', '>' or '<'
637 # These values are usually ignored or replaced by '?' in the UniProt web client
638 # There may be not numeric value at all (e.g. Q99497)
639 # In these cases uniprot shows the domain but it does not allow to use its functionallities
640 # e.g. you can not blast using the domain sequence
641 # It makes not sense having a domain with unkown range to me so we skip these domains
642 begin = feature['begin'].replace('~', '').replace('>', '').replace('<', '')
643 end = feature['end'].replace('~', '').replace('>', '').replace('<', '')
644 if begin == '' or end == '':
645 continue
646 selection = begin + '-' + end
647 # If we already have a domain with the same name then join both domains
648 # For instance, you may have several repetitions of the 'Disordered' region
649 already_existing_domain = next((domain for domain in domains if domain['name'] == name), None)
650 if already_existing_domain:
651 already_existing_domain['selection'] += ', ' + selection
652 continue
653 # Otherwise, create a new domain
654 domain = {
655 'name': name,
656 'selection': selection
657 }
658 # Add a description only if we succesfully mined it
659 # Note that only features tagged as CHAIN have comments (not sure about this)
660 if description:
661 domain['description'] = description
662 # Add the new domain to the list
663 domains.append(domain)
664 # Mine protein functions from Gene Ontology references
665 # Get database references
666 db_references = parsed_response['dbReferences']
667 # Get references from Gene Ontology (GO) only
668 go_references = [ ref for ref in db_references if ref['type'] == 'GO' ]
669 # A Gene Ontology entry may be one of three types:
670 # Cellular Component (C), Molecular Function (F) and Biological Process (P)
671 # In this case we are interested in protein function only so we will keep those with the 'F' header
672 functions = [ ref['properties']['term'][2:] for ref in go_references if ref['properties']['term'][0:2] == 'F:' ]
673 # Set the final reference to be uploaded to the database
674 return {
675 'name': protein_name,
676 'gene': gene_names,
677 'organism': organism,
678 'uniprot': uniprot_accession,
679 'sequence': sequence,
680 'domains': domains,
681 'functions': functions
682 }
684# Given a pdb Id, get its uniprot id
685# e.g. 6VW1 -> Q9BYF1, P0DTC2, P59594
686def pdb_to_uniprot (pdb_id : str) -> list[ str | NoReferableException ]:
687 # Set the request query
688 query = '''query ($id: String!) {
689 entry(entry_id: $id) {
690 polymer_entities {
691 rcsb_polymer_entity_container_identifiers { uniprot_ids }
692 rcsb_entity_source_organism { scientific_name }
693 entity_poly { pdbx_seq_one_letter_code }
694 }
695 }
696 }'''
697 # Request PDB data
698 parsed_response = request_pdb_data(pdb_id, query)
699 # Mine data
700 uniprot_ids = []
701 for polymer in parsed_response['polymer_entities']:
702 identifier = polymer['rcsb_polymer_entity_container_identifiers']
703 # Get the uniprot
704 uniprots = identifier.get('uniprot_ids', None)
705 # If there are not UniProts at all in this entity skip to the next
706 if not uniprots:
707 # However, it may happen the the polymer organism is set as synthetic construct
708 # In this case we keep the sequence and return a no referable exception
709 organisms = polymer.get('rcsb_entity_source_organism', None)
710 if not organisms: raise ValueError('Missing organism')
711 # Get scientific names in lower caps since sometimes they are all upper caps
712 scientific_names = set(
713 [ (organism.get('scientific_name') or '').lower() for organism in organisms ])
714 if 'synthetic construct' in scientific_names:
715 entity = polymer.get('entity_poly', None)
716 if not entity: raise ValueError(f'Missing entity in {pdb_id}')
717 sequence = entity.get('pdbx_seq_one_letter_code', None)
718 if not sequence: raise ValueError(f'Missing sequence in {pdb_id}')
719 # WARNING: A syntetic construct may have a "special" sequence
720 # It may combine one-letter code with 3-letter code in parenthesis for special aminoacids
721 # e.g. 5JMO, entity 3 -> (DKA)RVK(AR7)(0QE)
722 # We simply replace these special aminoacids by X
723 sequence = re.sub(r'\([0-9A-Z]{3}\)', 'X', sequence)
724 uniprot_ids.append( NoReferableException(sequence) )
725 continue
726 # Check if the sequence of this chain may belong to an antibody
727 entity = polymer.get('entity_poly', None)
728 if not entity: continue
729 sequence = entity.get('pdbx_seq_one_letter_code', None)
730 if not sequence: continue
731 print(' Could this be an antibody?')
732 is_antibody = False
733 for reference_sequence in REFERENCE_ANTIBODY_SEQUENCES:
734 if align(reference_sequence, sequence):
735 is_antibody = True
736 break
737 conclusion = f' I would guess it {"is" if is_antibody else "is not"} an antibody'
738 print(conclusion)
739 # If so, the also set this chain as no referable since antibodies have no UniProt id
740 if is_antibody:
741 uniprot_ids.append( NoReferableException(sequence) )
742 continue
743 # If we did not fall in any of the previous sections then continue
744 continue
745 # If we have multiple uniprots in a single entity then we skip them
746 # Note tha they belong to an entity which is no referable (e.g. a chimeric entity)
747 # See 5GY2 and 7E2Z labeled as chimeric entities and 6e67, which is not labeled likewise
748 if len(uniprots) > 1:
749 warn(f'Multiple UniProt ids in the same entity in {pdb_id} -> Is this a chimeric entity?')
750 entity = polymer.get('entity_poly', None)
751 if not entity: raise ValueError(f'Missing entity in {pdb_id}')
752 sequence = entity.get('pdbx_seq_one_letter_code', None)
753 if not sequence: raise ValueError(f'Missing sequence in {pdb_id}')
754 uniprot_ids.append( NoReferableException(sequence) )
755 continue
756 uniprot_id = uniprots[0]
757 uniprot_ids.append(uniprot_id)
758 actual_uniprot_ids = [ uniprot_id for uniprot_id in uniprot_ids if type(uniprot_id) == str ]
759 if len(actual_uniprot_ids) > 0:
760 print(f' UniProt ids for PDB id {pdb_id}: ' + ', '.join(actual_uniprot_ids))
761 else: print(f' No UniProt ids were found for PDB id {pdb_id}')
762 no_refs = [ uniprot_id for uniprot_id in uniprot_ids if type(uniprot_id) == NoReferableException ]
763 no_refs_count = len(no_refs)
764 if no_refs_count > 0:
765 print(f' Also encountered {no_refs_count} no refereable sequences for PDB id {pdb_id}')
766 return uniprot_ids
768# This function is used by the generate_metadata script
769# 1. Get structure sequences
770# 2. Calculate which reference domains are covered by the previous sequence
771# 3. In case it is a covid spike, align the previous sequence against all saved variants (they are in 'utils')
772from mddb_workflow.resources.covid_variants import covid_spike_variants
773def get_sequence_metadata (structure : 'Structure', protein_references_file : 'File', residue_map : dict) -> dict:
774 # Mine sequences from the structure
775 sequences = []
776 # Classify sequences according to if they belong to protein or nucleic sequences
777 # WARNING: We are interested in unique sequence BUT we also want to keep the order
778 # WARNING: Do NOT use sets here to the order of appearance in the structure is respected
779 protein_sequences = []
780 nucleic_sequences = []
781 # Iterate structure chains
782 for chain in structure.chains:
783 # Get the current chain sequence and add it to the list
784 sequence = chain.get_sequence()
785 sequences.append(sequence)
786 # Get the chain classification
787 classification = chain.get_classification()
788 # Depending on what it is, add the sequence also in the corresponding list
789 if classification == 'protein' and sequence not in protein_sequences:
790 protein_sequences.append(sequence)
791 elif (classification == 'dna' or classification == 'rna') and sequence not in nucleic_sequences:
792 nucleic_sequences.append(sequence)
793 # Get values from the residue map
794 # Get protein references from the residues map
795 reference_ids = []
796 references = residue_map['references']
797 if references and len(references) > 0:
798 for ref, ref_type in zip(references, residue_map['reference_types']):
799 if ref_type == 'protein':
800 reference_ids.append(ref)
801 residue_reference_numbers = residue_map['residue_reference_numbers']
802 residue_reference_indices = residue_map['residue_reference_indices']
803 # Load references data, which should already be save to the references data file
804 references_data = import_references(protein_references_file) if protein_references_file.exists else {}
805 # In case we have the SARS-CoV-2 spike among the references, check also which is the variant it belongs to
806 # DANI: Esto es un parche. En un futuro buscaremos una manera mejor de comprovar variantes en cualquier contexto
807 variant = None
808 covid_spike_reference_id = 'P0DTC2'
809 if covid_spike_reference_id in reference_ids:
810 # Load the reference data and find a chain which belongs to the spike
811 # We consider having only one variant, so all chains should return the same result
812 reference_data = references_data[covid_spike_reference_id]
813 covid_spike_reference_index = reference_ids.index(covid_spike_reference_id)
814 sample_residue_index = next((residue_index for residue_index, reference_index in enumerate(residue_reference_indices) if reference_index == covid_spike_reference_index), None)
815 if sample_residue_index == None:
816 raise SystemExit('Failed to find residue belonging to SARS-CoV-2 variant')
817 sample_chain_sequence = structure.residues[sample_residue_index].chain.get_sequence()
818 # Align the sample chain sequence against all variants to find the best match
819 highest_score = 0
820 print('Finding SARS-CoV-2 variant')
821 for variant_name, variant_sequence in covid_spike_variants.items():
822 print(f' Trying with {variant_name}')
823 align_results = align(variant_sequence, sample_chain_sequence)
824 if not align_results:
825 continue
826 mapping, score = align_results
827 if score > highest_score:
828 highest_score = score
829 variant = variant_name
830 # At this point there should be a result
831 if not variant:
832 raise SystemExit('Something went wrong trying to find the SARS-CoV-2 variant')
833 print(f'It is {variant}')
834 # Set which reference domains are present in the structure
835 domains = []
836 for reference_index, reference_id in enumerate(reference_ids):
837 # If this is not referable then there are no domains to mine
838 if reference_id == NO_REFERABLE_FLAG:
839 continue
840 # If the reference was not found then there are no domains to mine
841 if reference_id == NOT_FOUND_FLAG:
842 continue
843 # Get residue numbers of residues which belong to the current reference in th residue map
844 residue_numbers = []
845 for residue_index, residue_reference_index in enumerate(residue_reference_indices):
846 if residue_reference_index == reference_index:
847 residue_numbers.append(residue_reference_numbers[residue_index])
848 # Set a function to find if any of the residue numbers is within a given range
849 def in_range (start : int, end : int) -> bool:
850 for residue_number in residue_numbers:
851 if residue_number >= start and residue_number <= end:
852 return True
853 return False
854 # Get the reference data for the current reference uniprot id
855 reference_data = references_data.get(reference_id, None)
856 if not reference_data:
857 raise SystemExit(reference_id + ' is not in the references data file')
858 # Iterate over data domains
859 for domain in reference_data['domains']:
860 selection = domain['selection']
861 # Iterate over the residue ranges in the selection field
862 residue_ranges = selection.split(', ')
863 for residue_range in residue_ranges:
864 start, end = [ int(num) for num in residue_range.split('-') ]
865 # If this range includes any resiudes in the residue map then the domain is included in the list
866 if in_range(start, end):
867 domains.append(domain['name'])
868 break
869 # Return the sequence matadata
870 return {
871 'sequences': sequences,
872 'protein_sequences': protein_sequences,
873 'nucleic_sequences': nucleic_sequences,
874 'domains': domains,
875 'cv19_variant': variant
876 }