Coverage for mddb_workflow / tools / generate_map.py: 57%
493 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 18:45 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 18:45 +0000
1import sys
2import json
3import urllib.request
4import numpy as np
5import re
7from mddb_workflow.utils.auxiliar import protein_residue_name_to_letter, NoReferableException
8from mddb_workflow.utils.auxiliar import InputError, warn, load_json, save_json, request_pdb_data
9from mddb_workflow.utils.constants import REFERENCE_SEQUENCE_FLAG, NO_REFERABLE_FLAG, NOT_FOUND_FLAG
10from mddb_workflow.utils.file import File
11from mddb_workflow.utils.type_hints import *
13import xmltodict
15from Bio import Align
16from Bio.Blast import NCBIWWW
17from Bio.Align import substitution_matrices, Alignment
19# Set generic sequences which should be similar to known antibodies
20REFERENCE_ANTIBODY_SEQUENCES = {
21 # Single domain antibody
22 # This sequence has been designed with the aid of DeepSeek
23 # CDR regionds have been replaced by X
24 'QVQLVESGGGLVQPGGSLRLSCAASXXXXXXXWYRQAPGKEREFVAXXXXXXRFTISRDNAKNTVYLQMNSLKPEDTAVYYCXXXXXXXXXXWGQGTQVTVSS'
25}
28# Save current stderr to further restore it
29stderr_backup = sys.stderr
30# Suppress stderr
31#sys.stderr = None
32# Import the function
33#from Bio.SubsMat import MatrixInfo
34# Restore stderr
35#sys.stderr = stderr_backup
37# AGUS: Necesitamos que la secuencia de aa alineada tenga gaps identificados con guiones y pairwaise2 (biopython < 1.80) no lo hace
38# AGUS: Para actualizar a biopython >= 1.80 pairwaise2 ya no existe y no podemos obtener el mismo outoput que necesitábamos
39# AGUS: Esta función parece resolver el problema: https://github.com/biopython/biopython/issues/4769
40# Set the aligner
41aligner = Align.PairwiseAligner(mode='local')
42aligner.substitution_matrix = substitution_matrices.load("BLOSUM62")
43aligner.open_gap_score = -10
44aligner.extend_gap_score = -0.5
45def add_leading_and_trailing_gaps(alignment: Alignment) -> Alignment:
46 coords = alignment.coordinates
48 # We need to add two columns on each side of the coordinates matrix.
49 # The first column will always point to the start of the sequences,
50 # hence filled with zeros.
51 new_coords = np.zeros((2, coords.shape[1] + 4), dtype=int)
53 # The last w column will always point to the end of sequences,
54 # hence filled with the length of the respective sequences.
55 target_len = len(alignment.sequences[0])
56 query_len = len(alignment.sequences[1])
57 last_col = np.array([target_len, query_len])
58 new_coords[:, -1] = last_col
60 # The middle columns will contain the original coordinates.
61 new_coords[:, 2:-2] = coords
63 # The second and one before last columns will point to the end of sequence,
64 # with less unaligned positions.
65 new_coords[:, 1] = coords[:, 0] - coords[:, 0].min()
66 new_coords[:, -2] = coords[:, -1] + (last_col - coords[:, -1]).min()
67 return Alignment(sequences=alignment.sequences, coordinates=new_coords)
69# Map the structure aminoacids sequences against the standard reference sequences
70# References are uniprot accession ids and they are optional
71# For each reference, align the reference sequence with the topology sequence
72# Chains which do not match any reference sequence will be blasted
73# Note that an internet connection is required both to retireve the uniprot reference sequence and to do the blast
74# NEVER FORGET: This system relies on the fact that topology chains are not repeated
75def generate_protein_mapping (
76 structure : 'Structure',
77 protein_references_file : 'File',
78 database : 'Database',
79 cache : 'Cache',
80 register : dict,
81 mercy : list[str] = [],
82 input_protein_references : list | dict = [],
83 pdb_ids : list[str] = [],
84) -> dict:
85 """ Map the structure aminoacids sequences against the Uniprot reference sequences. """
86 # Remove previous warnings, if any
87 register.remove_warnings(REFERENCE_SEQUENCE_FLAG)
88 # Forced references must be list or dict
89 # If it is none then we set it as an empty list
90 if input_protein_references == None:
91 input_protein_references = []
92 # If forced references is a list of dictionaries then it means the input is wrongly formatted
93 # This may happen since the inputs file is in YAML format, and a simple hyphen makes the difference
94 # We can fix it from here anyway
95 if type(input_protein_references) == list and len(input_protein_references) > 0 and type(input_protein_references[0]) == dict:
96 input_protein_references = { k: v for fr in input_protein_references for k, v in fr.items() }
97 # Check if the forced references are strict (i.e. reference per chain, as a dictionary) or flexible (list of references)
98 strict_references = type(input_protein_references) == dict
99 # Check the "no referable" flag not to be passed when references are not strict
100 if not strict_references and NO_REFERABLE_FLAG in input_protein_references:
101 raise InputError(' The "no referable" flag cannot be passed in a list.' \
102 f' You must use a chain keys dictionary (e.g. {"A":"{NO_REFERABLE_FLAG}"})')
103 # Store all the references which are got through this process
104 # Note that not all references may be used at the end
105 references = {}
106 # Given a uniprot accession, get the reference object
107 # Try first asking to the MDposit database in case the reference exists already
108 # If not, retrieve UniProt data and build the reference object
109 # Return also a boolean to set if the reference already existed (True) or not (False)
110 def get_reference (uniprot_accession : str) -> tuple[dict, bool]:
111 reference = references.get(uniprot_accession, None)
112 if reference:
113 return reference, True
114 reference = database.get_reference_data('proteins', uniprot_accession)
115 if reference:
116 return reference, True
117 reference = get_uniprot_reference(uniprot_accession)
118 return reference, False
119 # Import local references, in case the references json file already exists
120 imported_references = None
121 if protein_references_file.exists:
122 imported_references = import_references(protein_references_file)
123 # Append the imported references to the overall references pool
124 for k,v in imported_references.items():
125 references[k] = v
126 # Get the structure chain sequences
127 parsed_chains = get_parsed_chains(structure)
128 # Find out which chains are protein
129 protein_parsed_chains = []
130 for chain_data in parsed_chains:
131 sequence = chain_data['sequence']
132 if next((letter for letter in sequence if letter != 'X'), None):
133 chain_data['match'] = { 'ref': None, 'map': None, 'score': 0 }
134 protein_parsed_chains.append(chain_data)
135 # If there are no protein sequences then there is no need to map anything
136 if len(protein_parsed_chains) == 0:
137 print(' There are no protein sequences')
138 return protein_parsed_chains
139 # For each input forced reference, get the reference sequence
140 reference_sequences = {}
141 # Save already tried alignments to not repeat the alignment further
142 tried_alignments = { chain_data['name']: [] for chain_data in protein_parsed_chains }
143 # Set a function to try to match all protein sequences with the available reference sequences
144 # In case of match, objects in the 'protein_parsed_chains' list are modified by adding the result
145 # Finally, return True if all protein sequences were matched with the available reference sequences or False if not
146 def match_sequences () -> bool:
147 # Track each chain-reference alignment match and keep the score of successful alignments
148 # Now for each structure sequence, align all reference sequences and keep the best alignment (if it meets the minimum)
149 for chain_data in protein_parsed_chains:
150 chain = chain_data['name']
151 chain_tried_alignments = tried_alignments[chain]
152 # In case references are forced per chain check if there is a reference for this chain and match according to this
153 if strict_references:
154 # Get the forced specific chain for this sequence, if any
155 forced_reference = input_protein_references.get(chain, None)
156 if forced_reference:
157 # If the chain has a specific forced reference then we must align it just once
158 # Skip this process in further matches
159 if chain_data['match']['ref']:
160 continue
161 # In case the forced reference is the "no referable" flag
162 # Thus it has no reference sequence and we must not try to match it
163 # Actually, any match would be accidental and not correct
164 if forced_reference == NO_REFERABLE_FLAG:
165 chain_data['match'] = { 'ref': NO_REFERABLE_FLAG }
166 continue
167 # In case the forced reference is the "not found" flag
168 # This should not happend but we may be using references as forced references, so just in case
169 if forced_reference == NOT_FOUND_FLAG:
170 chain_data['match'] = { 'ref': NOT_FOUND_FLAG }
171 continue
172 # Get the forced reference sequence and align it to the chain sequence in order to build the map
173 reference_sequence = reference_sequences[forced_reference]
174 print(f' Aligning chain {chain} with {forced_reference} reference sequence')
175 align_results = align(reference_sequence, chain_data['sequence'])
176 # The align must match or we stop here and warn the user
177 if not align_results:
178 raise InputError(f'Forced reference {chain} -> {forced_reference} does not match in sequence')
179 sequence_map, align_score = align_results
180 reference = references[forced_reference]
181 chain_data['match'] = { 'ref': reference, 'map': sequence_map, 'score': align_score }
182 continue
183 # If the chain has already a match and this match is among the forced references then stop here
184 # Forced references have priority and this avoids having a match with a not forced reference further
185 # Same behaviour if the match is with an unreferable sequence
186 if chain_data['match']['ref'] and ( chain_data['match']['ref'] == NO_REFERABLE_FLAG
187 or chain_data['match']['ref'] == NOT_FOUND_FLAG
188 or chain_data['match']['ref']['uniprot'] in input_protein_references):
189 continue
190 # Iterate over the different available reference sequences
191 for uniprot_id, reference_sequence in reference_sequences.items():
192 # If this alignment has been tried already then skip it
193 if uniprot_id in chain_tried_alignments:
194 continue
195 # Align the structure sequence with the reference sequence
196 # NEVER FORGET: This system relies on the fact that topology chains are not repeated
197 print(f' Aligning chain {chain} with {uniprot_id} reference sequence')
198 align_results = align(reference_sequence, chain_data['sequence'])
199 tried_alignments[chain].append(uniprot_id) # Save the alignment try, no matter if it works or not
200 if not align_results:
201 continue
202 # In case we have a valid alignment, check the alignment score is better than the current reference score (if any)
203 sequence_map, align_score = align_results
204 current_reference = chain_data['match']
205 if current_reference['score'] > align_score:
206 continue
207 # If the match is a 'no referable' exception then set a no referable flag
208 if type(uniprot_id) == NoReferableException:
209 chain_data['match'] = { 'ref': NO_REFERABLE_FLAG }
210 continue
211 # Proceed to set the corresponding reference toherwise
212 reference = references[uniprot_id]
213 # If the alignment is better then we impose the new reference
214 chain_data['match'] = { 'ref': reference, 'map': sequence_map, 'score': align_score }
215 # Sum up the current matching
216 print(' Protein reference summary:')
217 for chain_data in parsed_chains:
218 name = chain_data['name']
219 match = chain_data.get('match', None)
220 if not match:
221 print(f' {name} -> Not protein')
222 continue
223 reference = chain_data['match'].get('ref', None)
224 if not reference:
225 print(f' {name} -> ¿?')
226 continue
227 if reference == NO_REFERABLE_FLAG:
228 print(f' {name} -> No referable')
229 continue
230 if reference == NOT_FOUND_FLAG:
231 print(f' {name} -> Not found')
232 continue
233 uniprot_id = reference['uniprot']
234 print(f' {name} -> {uniprot_id}')
235 # Export already matched references
236 export_references(protein_parsed_chains, protein_references_file)
237 # Finally, return True if all protein sequences were matched with the available reference sequences or False if not
238 allright = all([ chain_data['match']['ref'] for chain_data in protein_parsed_chains ])
239 # If we match all chains then make sure there is no forced reference missing which did not match
240 # Otherwise stop here and force the user to remove these forced uniprot ids from the inputs file
241 # WARNING: Although we could move on it is better to stop here and warn the user to prevent a future silent problem
242 if allright and input_protein_references:
243 # Get forced uniprot ids
244 forced_uniprot_ids = set(list(input_protein_references.values()) if strict_references else input_protein_references)
245 forced_uniprot_ids -= { NOT_FOUND_FLAG, NO_REFERABLE_FLAG }
246 #forced_uniprot_ids.remove(NO_REFERABLE_FLAG)
247 #forced_uniprot_ids.remove(NOT_FOUND_FLAG)
248 # Get matched uniprot ids
249 matched_references = [ chain_data['match']['ref'] for chain_data in protein_parsed_chains ]
250 matched_uniprot_ids = set([ ref['uniprot'] for ref in matched_references if type(ref) == dict ])
251 # Check the difference
252 unmatched_uniprot_ids = forced_uniprot_ids - matched_uniprot_ids
253 if len(unmatched_uniprot_ids) > 0:
254 log = ', '.join(unmatched_uniprot_ids)
255 raise InputError(f'Some forced references were not matched with any protein sequence: {log}\n'
256 ' Please remove them from the inputs file')
257 return allright
258 # --- End of match_sequences function --------------------------------------------------------------------------------
259 # First use the forced references for the matching
260 if input_protein_references:
261 forced_uniprot_ids = list(input_protein_references.values()) if strict_references else input_protein_references
262 for uniprot_id in forced_uniprot_ids:
263 # If instead of a uniprot id there is a 'no referable' flag then we skip this process
264 if uniprot_id == NO_REFERABLE_FLAG:
265 continue
266 # If instead of a uniprot id there is a 'not found' flag then we skip this process
267 # This should not happend but we may be using references as forced references, so just in case
268 if uniprot_id == NOT_FOUND_FLAG:
269 continue
270 # If reference is already in the list (i.e. it has been imported) then skip this process
271 reference = references.get(uniprot_id, None)
272 if reference:
273 reference_sequences[uniprot_id] = reference['sequence']
274 continue
275 # Find the reference data for the given uniprot id
276 reference, already_loaded = get_reference(uniprot_id)
277 reference_sequences[uniprot_id] = reference['sequence']
278 # Save the current whole reference object for later
279 references[reference['uniprot']] = reference
280 # Now that we have all forced references data perform the matching
281 # If we have every protein chain matched with a reference then we stop here
282 print(' Using only forced references from the inputs file')
283 if match_sequences():
284 return protein_parsed_chains
285 # Now add the imported references to reference sequences. Thus now they will be 'matchable'
286 # Thus now they will be 'matchable', so try to match sequences again in case any of the imported references has not been tried
287 # It was not done before since we want forced references to have priority
288 if imported_references:
289 need_rematch = False
290 for uniprot_id, reference in imported_references.items():
291 # If the imported reference has been aligned already (i.e. it was a forced reference)
292 if uniprot_id in reference_sequences:
293 continue
294 # Otherwise, include it
295 need_rematch = True
296 reference_sequences[uniprot_id] = reference['sequence']
297 # If there was at least one imported reference missing then rerun the matching
298 if need_rematch:
299 print(' Using also references imported from references.json')
300 if match_sequences():
301 return protein_parsed_chains
302 # If there are still any chain which is not matched with a reference then we need more references
303 # To get them, retrieve all uniprot ids associated to the pdb ids, if any
304 if pdb_ids and len(pdb_ids) > 0:
305 for pdb_id in pdb_ids:
306 # Ask PDB
307 uniprot_ids = pdb_to_uniprot(pdb_id)
308 for uniprot_id in uniprot_ids:
309 # If this is not an actual UniProt, but a no referable exception, then handle it
310 # We must find the matching sequence and set the corresponding chain as no referable
311 if type(uniprot_id) == NoReferableException:
312 reference_sequences[uniprot_id] = uniprot_id.sequence
313 continue
314 # Build a new reference from the resulting uniprot
315 reference, already_loaded = get_reference(uniprot_id)
316 if reference == None:
317 continue
318 reference_sequences[reference['uniprot']] = reference['sequence']
319 # Save the current whole reference object for later
320 references[reference['uniprot']] = reference
321 # If we have every protein chain matched with a reference already then we stop here
322 print(' Using also references related to PDB ids from the inputs file')
323 if match_sequences():
324 return protein_parsed_chains
325 # If there are still any chain which is not matched with a reference then we need more references
326 # To get them, we run a blast with each orphan chain sequence
327 for chain_data in protein_parsed_chains:
328 # Skip already references chains
329 if chain_data['match']['ref']:
330 continue
331 # Run the blast
332 sequence = chain_data['sequence']
333 uniprot_id = blast(sequence, cache)
334 if not uniprot_id:
335 chain_data['match'] = { 'ref': NOT_FOUND_FLAG }
336 continue
337 # Build a new reference from the resulting uniprot
338 reference, already_loaded = get_reference(uniprot_id)
339 reference_sequences[reference['uniprot']] = reference['sequence']
340 # Save the current whole reference object for later
341 references[reference['uniprot']] = reference
342 # If we have every protein chain matched with a reference already then we stop here
343 print(' Using also references from blast')
344 if match_sequences():
345 return protein_parsed_chains
346 # At this point we should have macthed all sequences
347 # If not, kill the process unless mercy was given
348 must_be_killed = REFERENCE_SEQUENCE_FLAG not in mercy
349 if must_be_killed:
350 raise InputError('BLAST failed to find a matching reference sequence for at least one protein sequence. See the warnings above.\n' + \
351 ' If your system has antibodies or synthetic constructs please consider marking these chains as "no referable" in the inputs file.\n' + \
352 ' If your system has exotic proteins whose sequences are not found in the Swiss-Prot database you may force non-curated UniProt ids.\n' + \
353 ' If your system has very exotic proteins whose sequence are not in UniProt you can use the "--mercy refseq" flag to skip this error.')
354 warn('BLAST failed to find a matching reference sequence for at least one protein sequence')
355 register.add_warning(REFERENCE_SEQUENCE_FLAG, 'There is at least one protein region which is not mapped to any reference sequence')
356 return protein_parsed_chains
358# Export reference objects data to a json file
359# This file is used by the loader to load new references to the database
360# Note that all references are saved to this json file, even those which are already in the database
361# It is the loader who is the responsible to check which references must be loaded and which ones are loaded already
362# Note that mapping data (i.e. which residue belongs to each reference) is not saved
363def export_references (mapping_data : list, protein_references_file : 'File'):
364 final_references = []
365 final_uniprots = []
366 for data in mapping_data:
367 match = data['match']
368 ref = match['ref']
369 if not ref or ref == NO_REFERABLE_FLAG or ref == NOT_FOUND_FLAG:
370 continue
371 uniprot = ref['uniprot']
372 if uniprot in final_uniprots:
373 continue
374 final_references.append(ref)
375 final_uniprots.append(uniprot)
376 # If there are no references to exported then do not egenrate the json file
377 if len(final_references) == 0:
378 return
379 # Write references to a json file
380 save_json(final_references, protein_references_file.path, indent = 4)
382# Import reference json file so we do not have to rerun this process
383def import_references (protein_references_file : 'File') -> list:
384 print(' Importing references from ' + protein_references_file.path)
385 # Read the file
386 file_content = load_json(protein_references_file.path)
387 # Format data as the process expects to find it
388 references = {}
389 for reference in file_content:
390 uniprot = reference['uniprot']
391 references[uniprot] = reference
392 return references
394# Get each chain name and aminoacids sequence in a topology
395# Output format example: [ { 'sequence': 'VNLTT', 'indices': [1, 2, 3, 4, 5] }, ... ]
396def get_parsed_chains (structure : 'Structure') -> list:
397 parsed_chains = []
398 chains = structure.chains
399 for chain in chains:
400 name = chain.name
401 sequence = ''
402 residue_indices = []
403 for residue in chain.residues:
404 letter = protein_residue_name_to_letter(residue.name)
405 sequence += letter
406 residue_indices.append(residue.index)
407 sequence_object = { 'name': name, 'sequence': sequence, 'residue_indices': residue_indices }
408 parsed_chains.append(sequence_object)
409 return parsed_chains
411# Align two aminoacid sequences
412# Return a list with the reference residue indexes (values)
413# which match each new sequence residues indexes (indexes)
414# Return also the score of the alignment
415# Return None when there is not valid alignment at all
416# Set verbose = True to see a visual summary of the sequence alignments in the logs
417def align (ref_sequence : str, new_sequence : str, verbose : bool = False) -> Optional[ tuple[list, float] ]:
419 #print('- REFERENCE\n' + ref_sequence + '\n- NEW\n' + new_sequence)
421 # If the new sequence is all 'X' stop here, since this would make the alignment infinite
422 # Then an array filled with None is returned
423 if all([ letter == 'X' for letter in new_sequence ]):
424 return None
426 # Return the new sequence as best aligned as possible with the reference sequence
427 #alignments = pairwise2.align.localds(ref_sequence, new_sequence, MatrixInfo.blosum62, -10, -0.5)
428 # DANI: Habría que hacerlo de esta otra forma según el deprecation warning (arriba hay más código)
429 # DANI: El problema es que el output lo tiene todo menos la sequencia en formato alienada
430 # DANI: i.e. formato '----VNLTT' (con los gaps identificados con guiones), que es justo el que necesito
431 # Actualized code for biopython >= 1.80
432 alignments = aligner.align(ref_sequence, new_sequence)
434 # In case there are no alignments it means the current chain has nothing to do with this reference
435 # Then an array filled with None is returned
436 if len(alignments) == 0:
437 return None
439 # Several alignments may be returned, specially when it is a difficult or impossible alignment
440 # Output format example: '----VNLTT'
441 best_alignment = alignments[0]
442 best_alignment_with_gaps = add_leading_and_trailing_gaps(best_alignment)
443 # Extract the allignment sequences
444 #aligned_ref, aligned_sequence = best_alignment_with_gaps.sequences
445 aligned_ref = best_alignment_with_gaps[0] # AGUS: Puede ser útil en el futuro
446 aligned_sequence = best_alignment_with_gaps[1]
447 # Obtain the score of the alignment
448 score = best_alignment.score
449 # Calculate the normalized score
450 normalized_score = score / len(new_sequence)
452 # If the normalized score does not reaches the minimum we consider the alignment is not valid
453 # It may happen when the reference goes for a specific chain but we must map all chains
454 # This 1 has been found experimentally
455 # Non maching sequence may return a 0.1-0.3 normalized score
456 # Matching sequence may return >4 normalized score
457 if normalized_score < 2:
458 print(' Not valid alignment')
459 return None
461 # Tell the user about the success
462 beautiful_normalized_score = round(normalized_score * 100) / 100
463 print(f' Valid alignment -> Normalized score = {beautiful_normalized_score}')
465 # Match each residue
466 aligned_mapping = []
467 aligned_index = 0
468 for l, letter in enumerate(aligned_sequence):
469 # Guions are skipped
470 if letter == '-':
471 continue
472 # Get the current residue of the new sequence
473 equivalent_letter = new_sequence[aligned_index]
474 if not letter == equivalent_letter:
475 raise RuntimeError(f'Something was wrong at position {l} :S')
476 # 'X' residues cannot be mapped since reference sequences should never have any 'X'
477 if letter == 'X':
478 aligned_mapping.append(None)
479 aligned_index += 1
480 continue
481 # Otherwise add the equivalent aligned index to the mapping
482 # WARNING: Add +1 since uniprot residue counts start at 1, not 0
483 aligned_mapping.append(l + 1)
484 aligned_index += 1
486 return aligned_mapping, normalized_score
488# Given an aminoacids sequence, return a list of uniprot ids
489# Note that we are blasting against UniProtKB / Swiss-Prot so results will always be valid UniProt accessions
490# WARNING: This always means results will correspond to curated entries only
491# If your sequence is from an exotic organism the result may be not from it but from other more studied organism
492# Since this function may take some time we always cache the result
493def blast (sequence : str, cache : Optional['Cache'] = None) -> Optional[str]:
494 # Check if we have the resulted saved in cache already
495 if cache != None:
496 cached_blasts = cache.retrieve('blasts', {})
497 if sequence in cached_blasts:
498 accession = cached_blasts[sequence]
499 print(f'Using previous blast result from cache: {accession}')
500 return accession
501 print('Throwing blast...')
502 result = NCBIWWW.qblast(
503 program = "blastp",
504 database = "swissprot", # UniProtKB / Swiss-Prot
505 sequence = sequence,
506 )
507 parsed_result = xmltodict.parse(result.read())
508 hits = parsed_result['BlastOutput']['BlastOutput_iterations']['Iteration']['Iteration_hits']
509 # When there is no result return None
510 # Note that this is possible although hardly unprobable
511 if not hits:
512 return None
513 # Get the first result only
514 # Note that when there is only one result the Hit isnot an list, but the hit itself
515 results = hits['Hit']
516 if type(results) == list: first_result = results[0]
517 elif type(results) == dict: first_result = results
518 else: raise RuntimeError('Invalid hit format')
519 # Return the accession
520 # DANI: Si algun día tienes problemas porque te falta el '.1' al final del accession puedes sacarlo de Hit_id
521 accession = first_result['Hit_accession']
522 print('Result: ' + accession)
523 # Save the result in the cache
524 if cache != None:
525 cached_blasts[sequence] = accession
526 cached_blasts = cache.update('blasts', cached_blasts)
527 return accession
529# Given a uniprot accession, use the uniprot API to request its data and then mine what is needed for the database
530def get_uniprot_reference (uniprot_accession : str) -> Optional[dict]:
531 # Request Uniprot
532 request_url = 'https://www.ebi.ac.uk/proteins/api/proteins/' + uniprot_accession
533 parsed_response = None
534 request = urllib.request.Request(request_url)
535 # One day the API was returning a 'Secure Connection Failed' error and this header fixed the problem
536 request.add_header('Referer', 'https://www.uniprot.org/')
537 try:
538 with urllib.request.urlopen(request) as response:
539 parsed_response = json.loads(response.read().decode("utf-8"))
540 # If the accession is not found in UniProt then the id is not valid
541 except urllib.error.HTTPError as error:
542 if error.code == 404:
543 warn('Cannot find UniProt entry for accession ' + uniprot_accession)
544 return None
545 print('Error when requesting ' + request_url)
546 raise RuntimeError(f'Something went wrong with the Uniprot request (error {error.code})')
547 except:
548 print('Error when requesting ' + request_url)
549 raise RuntimeError(f'Something went very wrong with the Uniprot request')
550 # If we have not a response at this point then it may mean we are trying to access an obsolete entry (e.g. P01607)
551 if parsed_response == None:
552 warn('Cannot find UniProt entry for accession ' + uniprot_accession)
553 return None
554 # Get the full protein name
555 protein_data = parsed_response['protein']
556 protein_name_data = protein_data.get('recommendedName', None)
557 # DANI: It is possible that the 'recommendedName' is missing if it is not a reviewed UniProt entry
558 if not protein_name_data:
559 warn(f'The UniProt accession {uniprot_accession} is missing the recommended name. You should consider changing the reference.')
560 protein_name_data = protein_data.get('submittedName', None)[0]
561 if not protein_name_data:
562 raise ValueError('Unexpected structure in UniProt response for accession ' + uniprot_accession)
563 protein_name = protein_name_data['fullName']['value']
564 # Get the gene names as a single string
565 gene_names = []
566 # WARNING: Some uniprot entries are missing gene names (e.g. P00718)
567 # WARNING: Some uniprot entries have names in non-canonical keys (e.g. orfNames, olnNames)
568 # These names are not shown event in the uniprot web page so we also skip them
569 genes = parsed_response.get('gene', [])
570 for gene in genes:
571 gene_name = gene.get('name', None)
572 if not gene_name:
573 continue
574 gene_names.append(gene_name['value'])
575 gene_names = ', '.join(gene_names) if len(gene_names) > 0 else None
576 # Get the organism name
577 organism = parsed_response['organism']['names'][0]['value']
578 # Get the aminoacids sequence
579 sequence = parsed_response['sequence']['sequence']
580 # Get interesting regions to be highlighted in the client
581 domains = []
582 # WARNING: Some uniprot entries are missing features (e.g. O27908)
583 features = parsed_response.get('features', [])
584 # Get comments data which may be useful to further build domain descriptions
585 comments_data = parsed_response.get('comments', None)
586 # There are many types of features but in this case we will focus on domanins and similar
587 target_types = ['CHAIN', 'REGION', 'DOMAIN', 'MOTIF', 'SITE']
588 for feature in features:
589 # Skip features of other types
590 if feature['type'] not in target_types:
591 continue
592 # Get the domain name
593 name = feature['description']
594 # Build the domain description from the coments data
595 description = None
596 if comments_data:
597 comments = [ comment for comment in parsed_response['comments'] if name == comment.get('molecule', None) ]
598 comment_text = []
599 for comment in comments:
600 if not comment.get('text', False): continue
601 text = comment.get('text', None)
602 if text == None: raise ValueError('Unexpected UniProt response format: no text in comment')
603 # DANI: el comment 'text' casi siempre es una lista
604 # DANI: solo tengo constancia de una vez en que era un string directamente
605 # DANI: en uno de los comentarios de https://www.ebi.ac.uk/proteins/api/proteins/Q15465
606 if type(text) == str: comment_text.append(text)
607 elif type(text) == list: comment_text.append(text[0]['value'])
608 else: raise ValueError('Unexpected UniProt response format: text in comment is neither str or list')
609 description = '\n\n'.join(comment_text)
610 # Set the domain selection
611 # The domain 'begin' and 'end' values may include non-numeric symbols such as '~', '>' or '<'
612 # These values are usually ignored or replaced by '?' in the UniProt web client
613 # There may be not numeric value at all (e.g. Q99497)
614 # In these cases uniprot shows the domain but it does not allow to use its functionallities
615 # e.g. you can not blast using the domain sequence
616 # It makes not sense having a domain with unkown range to me so we skip these domains
617 begin = feature['begin'].replace('~', '').replace('>', '').replace('<', '')
618 end = feature['end'].replace('~', '').replace('>', '').replace('<', '')
619 if begin == '' or end == '':
620 continue
621 selection = begin + '-' + end
622 # If we already have a domain with the same name then join both domains
623 # For instance, you may have several repetitions of the 'Disordered' region
624 already_existing_domain = next((domain for domain in domains if domain['name'] == name), None)
625 if already_existing_domain:
626 already_existing_domain['selection'] += ', ' + selection
627 continue
628 # Otherwise, create a new domain
629 domain = {
630 'name': name,
631 'selection': selection
632 }
633 # Add a description only if we succesfully mined it
634 # Note that only features tagged as CHAIN have comments (not sure about this)
635 if description:
636 domain['description'] = description
637 # Add the new domain to the list
638 domains.append(domain)
639 # Mine protein functions from Gene Ontology references
640 # Get database references
641 db_references = parsed_response['dbReferences']
642 # Get references from Gene Ontology (GO) only
643 go_references = [ ref for ref in db_references if ref['type'] == 'GO' ]
644 # A Gene Ontology entry may be one of three types:
645 # Cellular Component (C), Molecular Function (F) and Biological Process (P)
646 # In this case we are interested in protein function only so we will keep those with the 'F' header
647 functions = [ ref['properties']['term'][2:] for ref in go_references if ref['properties']['term'][0:2] == 'F:' ]
648 # Set the final reference to be uploaded to the database
649 return {
650 'name': protein_name,
651 'gene': gene_names,
652 'organism': organism,
653 'uniprot': uniprot_accession,
654 'sequence': sequence,
655 'domains': domains,
656 'functions': functions
657 }
659# Given a pdb Id, get its uniprot id
660# e.g. 6VW1 -> Q9BYF1, P0DTC2, P59594
661def pdb_to_uniprot (pdb_id : str) -> list[ str | NoReferableException ]:
662 # Set the request query
663 query = '''query ($id: String!) {
664 entry(entry_id: $id) {
665 polymer_entities {
666 rcsb_polymer_entity_container_identifiers { uniprot_ids }
667 rcsb_entity_source_organism { scientific_name }
668 entity_poly { pdbx_seq_one_letter_code }
669 }
670 }
671 }'''
672 # Request PDB data
673 parsed_response = request_pdb_data(pdb_id, query)
674 # Mine data
675 uniprot_ids = []
676 # WARNING: Polymers do not come in the same order than in PDB
677 # WARNING: You will not know the entity number by enumerating them
678 for polymer in parsed_response['polymer_entities']:
679 # Get the aminoacids sequence of this polymer (or chain)
680 entity = polymer.get('entity_poly', None)
681 sequence = entity.get('pdbx_seq_one_letter_code', None) if entity else None
682 if sequence:
683 # WARNING: some polymers/chains may have a "special" sequence
684 # It may combine one-letter code with 3-letter code in parenthesis for special aminoacids
685 # e.g. 5JMO, entity 3 -> (DKA)RVK(AR7)(0QE)
686 # e.g. 6ME2, entity 1 -> ... DRYLYI(YCM)HSLKYD ...
687 # e.g. nucleic acids -> (DC)(DA)(DA)(DC)(DC)(DG)(DC)(DA)(DA)(DC)
688 # We simply replace these special aminoacids by X
689 sequence = re.sub(r'\([0-9A-Z]{2,3}\)', 'X', sequence)
690 # Get the uniprot ids associated to this polymer (or chain)
691 identifier = polymer['rcsb_polymer_entity_container_identifiers']
692 uniprots = identifier.get('uniprot_ids', None)
693 # If there are not UniProt ids in this entity then it may be no referable
694 # If we have a no referable entity then we must return an exception with its sequence
695 # Beware that nucleic acids also fall in this section
696 if not uniprots:
697 # If this polymer, whatever it is, has not sequence then there is nothing we can do
698 if not sequence: continue
699 # Nueclic acid sequences should be all X at this point
700 if all(letter == 'X' for letter in sequence): continue
701 # Get the organisms
702 organisms = polymer.get('rcsb_entity_source_organism', None)
703 # Some synthetic constructs may have not defined organisms at all
704 # e.g. 3H11, entity 3
705 if not organisms:
706 uniprot_ids.append( NoReferableException(sequence) )
707 continue
708 # Get scientific names in lower caps since sometimes they are all upper caps
709 scientific_names = set(
710 [ (organism.get('scientific_name') or '').lower() for organism in organisms ])
711 # If we have a synthetic construct then flag the sequence as no referable
712 if 'synthetic construct' in scientific_names:
713 uniprot_ids.append( NoReferableException(sequence) )
714 continue
715 # Check if the sequence of this chain may belong to an antibody
716 print(' Could this be an antibody?')
717 is_antibody = False
718 for reference_sequence in REFERENCE_ANTIBODY_SEQUENCES:
719 if align(reference_sequence, sequence):
720 is_antibody = True
721 break
722 print(f' I guess it {"is" if is_antibody else "is not"} an antibody')
723 # If so, the also set this chain as no referable since antibodies have no UniProt id
724 if is_antibody:
725 uniprot_ids.append( NoReferableException(sequence) )
726 continue
727 # If we did not fall in any of the previous sections then continue
728 continue
729 # If we have multiple uniprots in a single entity then we skip them
730 # Note tha they belong to an entity which is no referable (e.g. a chimeric entity)
731 # See 5GY2 and 7E2Z labeled as chimeric entities and 6e67, which is not labeled likewise
732 if len(uniprots) > 1:
733 warn(f'Multiple UniProt ids in the same entity in {pdb_id} -> Is this a chimeric entity?')
734 if not sequence: raise ValueError(f'Missing sequence with multiple UniProt ids in {pdb_id}')
735 uniprot_ids.append( NoReferableException(sequence) )
736 continue
737 # Normally, a polymer/chain will have one UniProt id only
738 uniprot_id = uniprots[0]
739 uniprot_ids.append(uniprot_id)
740 # Count how many UniProt ids we found
741 actual_uniprot_ids = [ uniprot_id for uniprot_id in uniprot_ids if type(uniprot_id) == str ]
742 if len(actual_uniprot_ids) > 0:
743 print(f' UniProt ids for PDB id {pdb_id}: ' + ', '.join(actual_uniprot_ids))
744 else: print(f' No UniProt ids were found for PDB id {pdb_id}')
745 # Count how many no referable sequences we found
746 no_refs = [ uniprot_id for uniprot_id in uniprot_ids if type(uniprot_id) == NoReferableException ]
747 no_refs_count = len(no_refs)
748 if no_refs_count > 0:
749 print(f' Also encountered {no_refs_count} no refereable sequences for PDB id {pdb_id}')
750 # Return them all
751 return uniprot_ids
753# This function is used by the generate_metadata script
754# 1. Get structure sequences
755# 2. Calculate which reference domains are covered by the previous sequence
756# 3. In case it is a covid spike, align the previous sequence against all saved variants (they are in 'utils')
757from mddb_workflow.resources.covid_variants import covid_spike_variants
758def get_sequence_metadata (structure : 'Structure', protein_references_file : 'File', residue_map : dict) -> dict:
759 # Mine sequences from the structure
760 sequences = []
761 # Classify sequences according to if they belong to protein or nucleic sequences
762 # WARNING: We are interested in unique sequence BUT we also want to keep the order
763 # WARNING: Do NOT use sets here to the order of appearance in the structure is respected
764 protein_sequences = []
765 nucleic_sequences = []
766 # Iterate structure chains
767 for chain in structure.chains:
768 # Get the current chain sequence and add it to the list
769 sequence = chain.get_sequence()
770 sequences.append(sequence)
771 # Get the chain classification
772 classification = chain.get_classification()
773 # Depending on what it is, add the sequence also in the corresponding list
774 if classification == 'protein' and sequence not in protein_sequences:
775 protein_sequences.append(sequence)
776 elif (classification == 'dna' or classification == 'rna') and sequence not in nucleic_sequences:
777 nucleic_sequences.append(sequence)
778 # Get values from the residue map
779 # Get protein references from the residues map
780 reference_ids = []
781 references = residue_map['references']
782 if references and len(references) > 0:
783 for ref, ref_type in zip(references, residue_map['reference_types']):
784 if ref_type == 'protein':
785 reference_ids.append(ref)
786 residue_reference_numbers = residue_map['residue_reference_numbers']
787 residue_reference_indices = residue_map['residue_reference_indices']
788 # Load references data, which should already be save to the references data file
789 references_data = import_references(protein_references_file) if protein_references_file.exists else {}
790 # In case we have the SARS-CoV-2 spike among the references, check also which is the variant it belongs to
791 # DANI: Esto es un parche. En un futuro buscaremos una manera mejor de comprovar variantes en cualquier contexto
792 variant = None
793 covid_spike_reference_id = 'P0DTC2'
794 if covid_spike_reference_id in reference_ids:
795 # Load the reference data and find a chain which belongs to the spike
796 # We consider having only one variant, so all chains should return the same result
797 reference_data = references_data[covid_spike_reference_id]
798 covid_spike_reference_index = reference_ids.index(covid_spike_reference_id)
799 sample_residue_index = next((residue_index for residue_index, reference_index in enumerate(residue_reference_indices) if reference_index == covid_spike_reference_index), None)
800 if sample_residue_index == None:
801 raise RuntimeError('Failed to find residue belonging to SARS-CoV-2 variant')
802 sample_chain_sequence = structure.residues[sample_residue_index].chain.get_sequence()
803 # Align the sample chain sequence against all variants to find the best match
804 highest_score = 0
805 print('Finding SARS-CoV-2 variant')
806 for variant_name, variant_sequence in covid_spike_variants.items():
807 print(f' Trying with {variant_name}')
808 align_results = align(variant_sequence, sample_chain_sequence)
809 if not align_results:
810 continue
811 mapping, score = align_results
812 if score > highest_score:
813 highest_score = score
814 variant = variant_name
815 # At this point there should be a result
816 if not variant:
817 raise RuntimeError('Something went wrong trying to find the SARS-CoV-2 variant')
818 print(f'It is {variant}')
819 # Set which reference domains are present in the structure
820 domains = []
821 for reference_index, reference_id in enumerate(reference_ids):
822 # If this is not referable then there are no domains to mine
823 if reference_id == NO_REFERABLE_FLAG:
824 continue
825 # If the reference was not found then there are no domains to mine
826 if reference_id == NOT_FOUND_FLAG:
827 continue
828 # Get residue numbers of residues which belong to the current reference in th residue map
829 residue_numbers = []
830 for residue_index, residue_reference_index in enumerate(residue_reference_indices):
831 if residue_reference_index == reference_index:
832 residue_numbers.append(residue_reference_numbers[residue_index])
833 # Set a function to find if any of the residue numbers is within a given range
834 def in_range (start : int, end : int) -> bool:
835 for residue_number in residue_numbers:
836 if residue_number >= start and residue_number <= end:
837 return True
838 return False
839 # Get the reference data for the current reference uniprot id
840 reference_data = references_data.get(reference_id, None)
841 if not reference_data:
842 raise RuntimeError(reference_id + ' is not in the references data file')
843 # Iterate over data domains
844 for domain in reference_data['domains']:
845 selection = domain['selection']
846 # Iterate over the residue ranges in the selection field
847 residue_ranges = selection.split(', ')
848 for residue_range in residue_ranges:
849 start, end = [ int(num) for num in residue_range.split('-') ]
850 # If this range includes any resiudes in the residue map then the domain is included in the list
851 if in_range(start, end):
852 domains.append(domain['name'])
853 break
854 # Return the sequence matadata
855 return {
856 'sequences': sequences,
857 'protein_sequences': protein_sequences,
858 'nucleic_sequences': nucleic_sequences,
859 'domains': domains,
860 'cv19_variant': variant
861 }