Coverage for model_workflow/tools/generate_map.py: 69%
482 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
1import sys
2import json
3import urllib.request
4import numpy as np
5from model_workflow.utils.auxiliar import protein_residue_name_to_letter, NoReferableException
6from model_workflow.utils.auxiliar import InputError, warn, load_json, save_json, request_pdb_data
7from model_workflow.utils.constants import REFERENCE_SEQUENCE_FLAG, NO_REFERABLE_FLAG, NOT_FOUND_FLAG
8from model_workflow.utils.file import File
9from model_workflow.utils.type_hints import *
11import xmltodict
13from Bio import Align
14from Bio.Blast import NCBIWWW
15from Bio.Align import substitution_matrices, Alignment
17# Save current stderr to further restore it
18stderr_backup = sys.stderr
19# Suppress stderr
20#sys.stderr = None
21# Import the function
22#from Bio.SubsMat import MatrixInfo
23# Restore stderr
24#sys.stderr = stderr_backup
26# AGUS: Necesitamos que la secuencia de aa alineada tenga gaps identificados con guiones y pairwaise2 (biopython < 1.80) no lo hace
27# AGUS: Para actualizar a biopython >= 1.80 pairwaise2 ya no existe y no podemos obtener el mismo outoput que necesitábamos
28# AGUS: Esta función parece resolver el problema: https://github.com/biopython/biopython/issues/4769
29# Set the aligner
30aligner = Align.PairwiseAligner(mode='local')
31aligner.substitution_matrix = substitution_matrices.load("BLOSUM62")
32aligner.open_gap_score = -10
33aligner.extend_gap_score = -0.5
34def add_leading_and_trailing_gaps(alignment: Alignment) -> Alignment:
35 coords = alignment.coordinates
37 # We need to add two columns on each side of the coordinates matrix.
38 # The first column will always point to the start of the sequences,
39 # hence filled with zeros.
40 new_coords = np.zeros((2, coords.shape[1] + 4), dtype=int)
42 # The last w column will always point to the end of sequences,
43 # hence filled with the length of the respective sequences.
44 target_len = len(alignment.sequences[0])
45 query_len = len(alignment.sequences[1])
46 last_col = np.array([target_len, query_len])
47 new_coords[:, -1] = last_col
49 # The middle columns will contain the original coordinates.
50 new_coords[:, 2:-2] = coords
52 # The second and one before last columns will point to the end of sequence,
53 # with less unaligned positions.
54 new_coords[:, 1] = coords[:, 0] - coords[:, 0].min()
55 new_coords[:, -2] = coords[:, -1] + (last_col - coords[:, -1]).min()
56 return Alignment(sequences=alignment.sequences, coordinates=new_coords)
58# Map the structure aminoacids sequences against the standard reference sequences
59# References are uniprot accession ids and they are optional
60# For each reference, align the reference sequence with the topology sequence
61# Chains which do not match any reference sequence will be blasted
62# Note that an internet connection is required both to retireve the uniprot reference sequence and to do the blast
63# NEVER FORGET: This system relies on the fact that topology chains are not repeated
64def generate_protein_mapping (
65 structure : 'Structure',
66 output_filepath : str,
67 database_url : str,
68 register : dict,
69 mercy : List[str] = [],
70 input_protein_references : Union[list,dict] = [],
71 pdb_ids : List[str] = [],
72) -> dict:
73 """Map the structure aminoacids sequences against the Uniprot reference sequences."""
74 # Set the output file
75 protein_references_file = File(output_filepath)
77 # Remove previous warnings, if any
78 register.remove_warnings(REFERENCE_SEQUENCE_FLAG)
79 # Forced references must be list or dict
80 # If it is none then we set it as an empty list
81 if input_protein_references == None:
82 input_protein_references = []
83 # If forced references is a list of dictionaries then it means the input is wrongly formatted
84 # This may happen since the inputs file is in YAML format, and a simple hyphen makes the difference
85 # We can fix it from here anyway
86 if type(input_protein_references) == list and len(input_protein_references) > 0 and type(input_protein_references[0]) == dict:
87 input_protein_references = { k: v for fr in input_protein_references for k, v in fr.items() }
88 # Check if the forced references are strict (i.e. reference per chain, as a dictionary) or flexible (list of references)
89 strict_references = type(input_protein_references) == dict
90 # Check the "no referable" flag not to be passed when references are not strict
91 if not strict_references and NO_REFERABLE_FLAG in input_protein_references:
92 raise SystemExit('WRONG INPUT: The "no referable" flag cannot be passed in a list.' \
93 ' You must use a chain keys dictionary (e.g. {"A":"' + NO_REFERABLE_FLAG + '"})')
94 # Store all the references which are got through this process
95 # Note that not all references may be used at the end
96 references = {}
97 # Given a uniprot accession, get the reference object
98 # Try first asking to the MDposit database in case the reference exists already
99 # If not, retrieve UniProt data and build the reference object
100 # Return also a boolean to set if the reference already existed (True) or not (False)
101 def get_reference (uniprot_accession : str) -> Tuple[dict, bool]:
102 reference = references.get(uniprot_accession, None)
103 if reference:
104 return reference, True
105 reference = get_mdposit_reference(uniprot_accession, database_url)
106 if reference:
107 return reference, True
108 reference = get_uniprot_reference(uniprot_accession)
109 return reference, False
110 # Import local references, in case the references json file already exists
111 imported_references = None
112 if protein_references_file.exists:
113 imported_references = import_references(protein_references_file)
114 # Append the imported references to the overall references pool
115 for k,v in imported_references.items():
116 references[k] = v
117 # Get the structure chain sequences
118 parsed_chains = get_parsed_chains(structure)
119 # Find out which chains are protein
120 protein_parsed_chains = []
121 for chain_data in parsed_chains:
122 sequence = chain_data['sequence']
123 if next((letter for letter in sequence if letter != 'X'), None):
124 chain_data['match'] = { 'ref': None, 'map': None, 'score': 0 }
125 protein_parsed_chains.append(chain_data)
126 # If there are no protein sequences then there is no need to map anything
127 if len(protein_parsed_chains) == 0:
128 print(' There are no protein sequences')
129 return protein_parsed_chains
130 # For each input forced reference, get the reference sequence
131 reference_sequences = {}
132 # Save already tried alignments to not repeat the alignment further
133 tried_alignments = { chain_data['name']: [] for chain_data in protein_parsed_chains }
134 # Set a function to try to match all protein sequences with the available reference sequences
135 # In case of match, objects in the 'protein_parsed_chains' list are modified by adding the result
136 # Finally, return True if all protein sequences were matched with the available reference sequences or False if not
137 def match_sequences () -> bool:
138 # Track each chain-reference alignment match and keep the score of successful alignments
139 # Now for each structure sequence, align all reference sequences and keep the best alignment (if it meets the minimum)
140 for chain_data in protein_parsed_chains:
141 chain = chain_data['name']
142 chain_tried_alignments = tried_alignments[chain]
143 # In case references are forced per chain check if there is a reference for this chain and match according to this
144 if strict_references:
145 # Get the forced specific chain for this sequence, if any
146 forced_reference = input_protein_references.get(chain, None)
147 if forced_reference:
148 # If the chain has a specific forced reference then we must align it just once
149 # Skip this process in further matches
150 if chain_data['match']['ref']:
151 continue
152 # In case the forced reference is the "no referable" flag
153 # Thus it has no reference sequence and we must not try to match it
154 # Actually, any match would be accidental and not correct
155 if forced_reference == NO_REFERABLE_FLAG:
156 chain_data['match'] = { 'ref': NO_REFERABLE_FLAG }
157 continue
158 # In case the forced reference is the "not found" flag
159 # This should not happend but we may be using references as forced references, so just in case
160 if forced_reference == NOT_FOUND_FLAG:
161 chain_data['match'] = { 'ref': NOT_FOUND_FLAG }
162 continue
163 # Get the forced reference sequence and align it to the chain sequence in order to build the map
164 reference_sequence = reference_sequences[forced_reference]
165 print(f' Aligning chain {chain} with {forced_reference} reference sequence')
166 align_results = align(reference_sequence, chain_data['sequence'])
167 # The align must match or we stop here and warn the user
168 if not align_results:
169 raise SystemExit(f'Forced reference {chain} -> {forced_reference} does not match in sequence')
170 sequence_map, align_score = align_results
171 reference = references[forced_reference]
172 chain_data['match'] = { 'ref': reference, 'map': sequence_map, 'score': align_score }
173 continue
174 # If the chain has already a match and this match is among the forced references then stop here
175 # Forced references have priority and this avoids having a match with a not forced reference further
176 if chain_data['match']['ref'] and chain_data['match']['ref']['uniprot'] in input_protein_references:
177 continue
178 # Iterate over the different available reference sequences
179 for uniprot_id, reference_sequence in reference_sequences.items():
180 # If this alignment has been tried already then skip it
181 if uniprot_id in chain_tried_alignments:
182 continue
183 # Align the structure sequence with the reference sequence
184 # NEVER FORGET: This system relies on the fact that topology chains are not repeated
185 print(f' Aligning chain {chain} with {uniprot_id} reference sequence')
186 align_results = align(reference_sequence, chain_data['sequence'])
187 tried_alignments[chain].append(uniprot_id) # Save the alignment try, no matter if it works or not
188 if not align_results:
189 continue
190 # In case we have a valid alignment, check the alignment score is better than the current reference score (if any)
191 sequence_map, align_score = align_results
192 current_reference = chain_data['match']
193 if current_reference['score'] > align_score:
194 continue
195 # If the match is a 'no referable' exception then set a no referable flag
196 if type(uniprot_id) == NoReferableException:
197 chain_data['match'] = { 'ref': NO_REFERABLE_FLAG }
198 continue
199 # Proceed to set the corresponding reference toherwise
200 reference = references[uniprot_id]
201 # If the alignment is better then we impose the new reference
202 chain_data['match'] = { 'ref': reference, 'map': sequence_map, 'score': align_score }
203 # Sum up the current matching
204 print(' Protein reference summary:')
205 for chain_data in parsed_chains:
206 name = chain_data['name']
207 match = chain_data.get('match', None)
208 if not match:
209 print(f' {name} -> Not protein')
210 continue
211 reference = chain_data['match'].get('ref', None)
212 if not reference:
213 print(f' {name} -> ¿?')
214 continue
215 if reference == NO_REFERABLE_FLAG:
216 print(f' {name} -> No referable')
217 continue
218 if reference == NOT_FOUND_FLAG:
219 print(f' {name} -> Not found')
220 continue
221 uniprot_id = reference['uniprot']
222 print(f' {name} -> {uniprot_id}')
223 # Export already matched references
224 export_references(protein_parsed_chains, protein_references_file)
225 # Finally, return True if all protein sequences were matched with the available reference sequences or False if not
226 allright = all([ chain_data['match']['ref'] for chain_data in protein_parsed_chains ])
227 # If we match all chains then make sure there is no forced reference missing which did not match
228 # Otherwise stop here and force the user to remove these forced uniprot ids from the inputs file
229 # WARNING: Although we could move on it is better to stop here and warn the user to prevent a future silent problem
230 if allright and input_protein_references:
231 # Get forced uniprot ids
232 forced_uniprot_ids = set(list(input_protein_references.values()) if strict_references else input_protein_references)
233 forced_uniprot_ids -= { NOT_FOUND_FLAG, NO_REFERABLE_FLAG }
234 #forced_uniprot_ids.remove(NO_REFERABLE_FLAG)
235 #forced_uniprot_ids.remove(NOT_FOUND_FLAG)
236 # Get matched uniprot ids
237 matched_references = [ chain_data['match']['ref'] for chain_data in protein_parsed_chains ]
238 matched_uniprot_ids = set([ ref['uniprot'] for ref in matched_references if type(ref) == dict ])
239 # Check the difference
240 unmatched_uniprot_ids = forced_uniprot_ids - matched_uniprot_ids
241 if len(unmatched_uniprot_ids) > 0:
242 log = ', '.join(unmatched_uniprot_ids)
243 raise InputError(f'Some forced references were not matched with any protein sequence: {log}\n'
244 ' Please remove them from the inputs file')
245 return allright
246 # --- End of match_sequences function --------------------------------------------------------------------------------
247 # First use the forced references for the matching
248 if input_protein_references:
249 forced_uniprot_ids = list(input_protein_references.values()) if strict_references else input_protein_references
250 for uniprot_id in forced_uniprot_ids:
251 # If instead of a uniprot id there is a 'no referable' flag then we skip this process
252 if uniprot_id == NO_REFERABLE_FLAG:
253 continue
254 # If instead of a uniprot id there is a 'not found' flag then we skip this process
255 # This should not happend but we may be using references as forced references, so just in case
256 if uniprot_id == NOT_FOUND_FLAG:
257 continue
258 # If reference is already in the list (i.e. it has been imported) then skip this process
259 reference = references.get(uniprot_id, None)
260 if reference:
261 reference_sequences[uniprot_id] = reference['sequence']
262 continue
263 # Find the reference data for the given uniprot id
264 reference, already_loaded = get_reference(uniprot_id)
265 reference_sequences[uniprot_id] = reference['sequence']
266 # Save the current whole reference object for later
267 references[reference['uniprot']] = reference
268 # Now that we have all forced references data perform the matching
269 # If we have every protein chain matched with a reference then we stop here
270 print(' Using only forced references from the inputs file')
271 if match_sequences():
272 return protein_parsed_chains
273 # Now add the imported references to reference sequences. Thus now they will be 'matchable'
274 # Thus now they will be 'matchable', so try to match sequences again in case any of the imported references has not been tried
275 # It was not done before since we want forced references to have priority
276 if imported_references:
277 need_rematch = False
278 for uniprot_id, reference in imported_references.items():
279 # If the imported reference has been aligned already (i.e. it was a forced reference)
280 if uniprot_id in reference_sequences:
281 continue
282 # Otherwise, include it
283 need_rematch = True
284 reference_sequences[uniprot_id] = reference['sequence']
285 # If there was at least one imported reference missing then rerun the matching
286 if need_rematch:
287 print(' Using also references imported from references.json')
288 if match_sequences():
289 return protein_parsed_chains
290 # If there are still any chain which is not matched with a reference then we need more references
291 # To get them, retrieve all uniprot ids associated to the pdb ids, if any
292 if pdb_ids and len(pdb_ids) > 0:
293 for pdb_id in pdb_ids:
294 # Ask PDB
295 uniprot_ids = pdb_to_uniprot(pdb_id)
296 for uniprot_id in uniprot_ids:
297 # If this is not an actual UniProt, but a no referable exception, then handle it
298 # We must find the matching sequence and set the corresponding chain as no referable
299 if type(uniprot_id) == NoReferableException:
300 reference_sequences[uniprot_id] = uniprot_id.sequence
301 continue
302 # Build a new reference from the resulting uniprot
303 reference, already_loaded = get_reference(uniprot_id)
304 if reference == None:
305 continue
306 reference_sequences[reference['uniprot']] = reference['sequence']
307 # Save the current whole reference object for later
308 references[reference['uniprot']] = reference
309 # If we have every protein chain matched with a reference already then we stop here
310 print(' Using also references related to PDB ids from the inputs file')
311 if match_sequences():
312 return protein_parsed_chains
313 # If there are still any chain which is not matched with a reference then we need more references
314 # To get them, we run a blast with each orphan chain sequence
315 for chain_data in protein_parsed_chains:
316 # Skip already references chains
317 if chain_data['match']['ref']:
318 continue
319 # Run the blast
320 sequence = chain_data['sequence']
321 uniprot_id = blast(sequence)
322 if not uniprot_id:
323 chain_data['match'] = { 'ref': NOT_FOUND_FLAG }
324 continue
325 # Build a new reference from the resulting uniprot
326 reference, already_loaded = get_reference(uniprot_id)
327 reference_sequences[reference['uniprot']] = reference['sequence']
328 # Save the current whole reference object for later
329 references[reference['uniprot']] = reference
330 # If we have every protein chain matched with a reference already then we stop here
331 print(' Using also references from blast')
332 if match_sequences():
333 return protein_parsed_chains
334 # At this point we should have macthed all sequences
335 # If not, kill the process unless mercy was given
336 must_be_killed = REFERENCE_SEQUENCE_FLAG not in mercy
337 if must_be_killed:
338 raise SystemExit('BLAST failed to find a matching reference sequence for at least one protein sequence')
339 warn('BLAST failed to find a matching reference sequence for at least one protein sequence')
340 register.add_warning(REFERENCE_SEQUENCE_FLAG, 'There is at least one protein region which is not mapped to any reference sequence')
341 return protein_parsed_chains
343# Export reference objects data to a json file
344# This file is used by the loader to load new references to the database
345# Note that all references are saved to this json file, even those which are already in the database
346# It is the loader who is the responsible to check which references must be loaded and which ones are loaded already
347# Note that mapping data (i.e. which residue belongs to each reference) is not saved
348def export_references (mapping_data : list, protein_references_file : 'File'):
349 final_references = []
350 final_uniprots = []
351 for data in mapping_data:
352 match = data['match']
353 ref = match['ref']
354 if not ref or ref == NO_REFERABLE_FLAG or ref == NOT_FOUND_FLAG:
355 continue
356 uniprot = ref['uniprot']
357 if uniprot in final_uniprots:
358 continue
359 final_references.append(ref)
360 final_uniprots.append(uniprot)
361 # If there are no references to exported then do not egenrate the json file
362 if len(final_references) == 0:
363 return
364 # Write references to a json file
365 save_json(final_references, protein_references_file.path, indent = 4)
367# Import reference json file so we do not have to rerun this process
368def import_references (protein_references_file : 'File') -> list:
369 print(' Importing references from ' + protein_references_file.path)
370 # Read the file
371 file_content = load_json(protein_references_file.path)
372 # Format data as the process expects to find it
373 references = {}
374 for reference in file_content:
375 uniprot = reference['uniprot']
376 references[uniprot] = reference
377 return references
379# Get each chain name and aminoacids sequence in a topology
380# Output format example: [ { 'sequence': 'VNLTT', 'indices': [1, 2, 3, 4, 5] }, ... ]
381def get_parsed_chains (structure : 'Structure') -> list:
382 parsed_chains = []
383 chains = structure.chains
384 for chain in chains:
385 name = chain.name
386 sequence = ''
387 residue_indices = []
388 for residue in chain.residues:
389 letter = protein_residue_name_to_letter(residue.name)
390 sequence += letter
391 residue_indices.append(residue.index)
392 sequence_object = { 'name': name, 'sequence': sequence, 'residue_indices': residue_indices }
393 parsed_chains.append(sequence_object)
394 return parsed_chains
396# Align two aminoacid sequences
397# Return a list with the reference residue indexes (values)
398# which match each new sequence residues indexes (indexes)
399# Return also the score of the alignment
400# Return None when there is not valid alignment at all
401# Set verbose = True to see a visual summary of the sequence alignments in the logs
402def align (ref_sequence : str, new_sequence : str, verbose : bool = False) -> Optional[ Tuple[list, float] ]:
404 #print('- REFERENCE\n' + ref_sequence + '\n- NEW\n' + new_sequence)
406 # If the new sequence is all 'X' stop here, since this would make the alignment infinite
407 # Then an array filled with None is returned
408 if all([ letter == 'X' for letter in new_sequence ]):
409 return None
411 # Return the new sequence as best aligned as possible with the reference sequence
412 #alignments = pairwise2.align.localds(ref_sequence, new_sequence, MatrixInfo.blosum62, -10, -0.5)
413 # DANI: Habría que hacerlo de esta otra forma según el deprecation warning (arriba hay más código)
414 # DANI: El problema es que el output lo tiene todo menos la sequencia en formato alienada
415 # DANI: i.e. formato '----VNLTT' (con los gaps identificados con guiones), que es justo el que necesito
416 # Actualized code for biopython >= 1.80
417 alignments = aligner.align(ref_sequence, new_sequence)
419 # In case there are no alignments it means the current chain has nothing to do with this reference
420 # Then an array filled with None is returned
421 if len(alignments) == 0:
422 return None
424 # Several alignments may be returned, specially when it is a difficult or impossible alignment
425 # Output format example: '----VNLTT'
426 best_alignment = alignments[0]
427 best_alignment_with_gaps = add_leading_and_trailing_gaps(best_alignment)
428 # Extract the allignment sequences
429 #aligned_ref, aligned_sequence = best_alignment_with_gaps.sequences
430 aligned_ref = best_alignment_with_gaps[0] # AGUS: Puede ser útil en el futuro
431 aligned_sequence = best_alignment_with_gaps[1]
432 # Obtain the score of the alignment
433 score = best_alignment.score
434 # Calculate the normalized score
435 normalized_score = score / len(new_sequence)
437 # If the normalized score does not reaches the minimum we consider the alignment is not valid
438 # It may happen when the reference goes for a specific chain but we must map all chains
439 # This 1 has been found experimentally
440 # Non maching sequence may return a 0.1-0.3 normalized score
441 # Matching sequence may return >4 normalized score
442 if normalized_score < 2:
443 print(' Not valid alignment')
444 return None
446 # Tell the user about the success
447 beautiful_normalized_score = round(normalized_score * 100) / 100
448 print(f' Valid alignment -> Normalized score = {beautiful_normalized_score}')
450 # Match each residue
451 aligned_mapping = []
452 aligned_index = 0
453 for l, letter in enumerate(aligned_sequence):
454 # Guions are skipped
455 if letter == '-':
456 continue
457 # Get the current residue of the new sequence
458 equivalent_letter = new_sequence[aligned_index]
459 if not letter == equivalent_letter:
460 raise SystemExit(f'Something was wrong at position {l} :S')
461 # 'X' residues cannot be mapped since reference sequences should never have any 'X'
462 if letter == 'X':
463 aligned_mapping.append(None)
464 aligned_index += 1
465 continue
466 # Otherwise add the equivalent aligned index to the mapping
467 # WARNING: Add +1 since uniprot residue counts start at 1, not 0
468 aligned_mapping.append(l + 1)
469 aligned_index += 1
471 return aligned_mapping, normalized_score
473# Given an aminoacids sequence, return a list of uniprot ids
474# Note that we are blasting against UniProtKB / Swiss-Prot so results will always be valid UniProt accessions
475# WARNING: This always means results will correspond to curated entries only
476# If your sequence is from an exotic organism the result may be not from it but from other more studied organism
477def blast (sequence : str) -> Optional[str]:
478 print('Throwing blast...')
479 result = NCBIWWW.qblast(
480 program = "blastp",
481 database = "swissprot", # UniProtKB / Swiss-Prot
482 sequence = sequence,
483 )
484 parsed_result = xmltodict.parse(result.read())
485 hits = parsed_result['BlastOutput']['BlastOutput_iterations']['Iteration']['Iteration_hits']
486 # When there is no result return None
487 # Note that this is possible although hardly unprobable
488 if not hits:
489 return None
490 # Get the first result only
491 result = hits['Hit'][0]
492 # Return the accession
493 # DANI: Si algun día tienes problemas porque te falta el '.1' al final del accession puedes sacarlo de Hit_id
494 accession = result['Hit_accession']
495 print('Result: ' + accession)
496 return accession
498# Given a uniprot accession, use the MDposit API to request its data in case it is already in the database
499# If the reference is not found or there is any error then return None and keep going, we do not really need this
500def get_mdposit_reference (uniprot_accession : str, database_url : str) -> Optional[dict]:
501 # Request MDposit
502 request_url = f'{database_url}rest/v1/references/{uniprot_accession}'
503 try:
504 with urllib.request.urlopen(request_url) as response:
505 parsed_response = json.loads(response.read().decode("utf-8"))
506 # If the accession is not found in the database then we stop here
507 except urllib.error.HTTPError as error:
508 # If the uniprot accession is not yet in the MDposit references then return None
509 if error.code == 404:
510 return None
511 else:
512 warn(f'Something went wrong with the MDposit request {request_url} (error {error.code})')
513 warn(f'Protein reference {uniprot_accession} will be made again even if it exists')
514 return None
515 # This error may occur if there is no internet connection
516 except urllib.error.URLError as error:
517 # The error variable as is do not work properly
518 # Its 'errno' value is None (at least for tiemout errors)
519 actual_error = error.args[0]
520 if actual_error.errno == 110:
521 print('Timeout error when requesting MDposit. Is the node fallen?')
522 warn('Something went wrong with the MDposit request ' + request_url)
523 warn(f'Protein reference {uniprot_accession} will be made again even if it exists')
524 return None
525 return parsed_response
527# Given a uniprot accession, use the uniprot API to request its data and then mine what is needed for the database
528def get_uniprot_reference (uniprot_accession : str) -> Optional[dict]:
529 # Request Uniprot
530 request_url = 'https://www.ebi.ac.uk/proteins/api/proteins/' + uniprot_accession
531 parsed_response = None
532 try:
533 with urllib.request.urlopen(request_url) as response:
534 parsed_response = json.loads(response.read().decode("utf-8"))
535 # If the accession is not found in UniProt then the id is not valid
536 except urllib.error.HTTPError as error:
537 if error.code == 404:
538 print('WARNING: Cannot find UniProt entry for accession ' + uniprot_accession)
539 return None
540 print('Error when requesting ' + request_url)
541 raise ValueError(f'Something went wrong with the Uniprot request (error {error.code})')
542 # If we have not a response at this point then it may mean we are trying to access an obsolete entry (e.g. P01607)
543 if parsed_response == None:
544 print('WARNING: Cannot find UniProt entry for accession ' + uniprot_accession)
545 return None
546 # Get the full protein name
547 protein_data = parsed_response['protein']
548 protein_name_data = protein_data.get('recommendedName', None)
549 # DANI: It is possible that the 'recommendedName' is missing if it is not a reviewed UniProt entry
550 if not protein_name_data:
551 warn(f'The UniProt accession {uniprot_accession} is missing the recommended name. You should consider changing the reference.')
552 protein_name_data = protein_data.get('submittedName', None)[0]
553 if not protein_name_data:
554 raise ValueError('Unexpected structure in UniProt response for accession ' + uniprot_accession)
555 protein_name = protein_name_data['fullName']['value']
556 # Get the gene names as a single string
557 gene_names = []
558 # WARNING: Some uniprot entries are missing gene names (e.g. P00718)
559 # WARNING: Some uniprot entries have names in non-canonical keys (e.g. orfNames, olnNames)
560 # These names are not shown event in the uniprot web page so we also skip them
561 genes = parsed_response.get('gene', [])
562 for gene in genes:
563 gene_name = gene.get('name', None)
564 if not gene_name:
565 continue
566 gene_names.append(gene_name['value'])
567 gene_names = ', '.join(gene_names) if len(gene_names) > 0 else None
568 # Get the organism name
569 organism = parsed_response['organism']['names'][0]['value']
570 # Get the aminoacids sequence
571 sequence = parsed_response['sequence']['sequence']
572 # Get interesting regions to be highlighted in the client
573 domains = []
574 # WARNING: Some uniprot entries are missing features (e.g. O27908)
575 features = parsed_response.get('features', [])
576 # Get comments data which may be useful to further build domain descriptions
577 comments_data = parsed_response.get('comments', None)
578 # There are many types of features but in this case we will focus on domanins and similar
579 target_types = ['CHAIN', 'REGION', 'DOMAIN', 'MOTIF', 'SITE']
580 for feature in features:
581 # Skip features of other types
582 if feature['type'] not in target_types:
583 continue
584 # Get the domain name
585 name = feature['description']
586 # Build the domain description from the coments data
587 description = None
588 if comments_data:
589 comments = [ comment for comment in parsed_response['comments'] if name == comment.get('molecule', None) ]
590 comment_text = []
591 for comment in comments:
592 if not comment.get('text', False): continue
593 text = comment.get('text', None)
594 if text == None: raise ValueError('Unexpected UniProt response format: no text in comment')
595 # DANI: el comment 'text' casi siempre es una lista
596 # DANI: solo tengo constancia de una vez en que era un string directamente
597 # DANI: en uno de los comentarios de https://www.ebi.ac.uk/proteins/api/proteins/Q15465
598 if type(text) == str: comment_text.append(text)
599 elif type(text) == list: comment_text.append(text[0]['value'])
600 else: raise ValueError('Unexpected UniProt response format: text in comment is neither str or list')
601 description = '\n\n'.join(comment_text)
602 # Set the domain selection
603 # The domain 'begin' and 'end' values may include non-numeric symbols such as '~', '>' or '<'
604 # These values are usually ignored or replaced by '?' in the UniProt web client
605 # There may be not numeric value at all (e.g. Q99497)
606 # In these cases uniprot shows the domain but it does not allow to use its functionallities
607 # e.g. you can not blast using the domain sequence
608 # It makes not sense having a domain with unkown range to me so we skip these domains
609 begin = feature['begin'].replace('~', '').replace('>', '').replace('<', '')
610 end = feature['end'].replace('~', '').replace('>', '').replace('<', '')
611 if begin == '' or end == '':
612 continue
613 selection = begin + '-' + end
614 # If we already have a domain with the same name then join both domains
615 # For instance, you may have several repetitions of the 'Disordered' region
616 already_existing_domain = next((domain for domain in domains if domain['name'] == name), None)
617 if already_existing_domain:
618 already_existing_domain['selection'] += ', ' + selection
619 continue
620 # Otherwise, create a new domain
621 domain = {
622 'name': name,
623 'selection': selection
624 }
625 # Add a description only if we succesfully mined it
626 # Note that only features tagged as CHAIN have comments (not sure about this)
627 if description:
628 domain['description'] = description
629 # Add the new domain to the list
630 domains.append(domain)
631 # Mine protein functions from Gene Ontology references
632 # Get database references
633 db_references = parsed_response['dbReferences']
634 # Get references from Gene Ontology (GO) only
635 go_references = [ ref for ref in db_references if ref['type'] == 'GO' ]
636 # A Gene Ontology entry may be one of three types:
637 # Cellular Component (C), Molecular Function (F) and Biological Process (P)
638 # In this case we are interested in protein function only so we will keep those with the 'F' header
639 functions = [ ref['properties']['term'][2:] for ref in go_references if ref['properties']['term'][0:2] == 'F:' ]
640 # Set the final reference to be uploaded to the database
641 return {
642 'name': protein_name,
643 'gene': gene_names,
644 'organism': organism,
645 'uniprot': uniprot_accession,
646 'sequence': sequence,
647 'domains': domains,
648 'functions': functions
649 }
651# Given a pdb Id, get its uniprot id
652# e.g. 6VW1 -> Q9BYF1, P0DTC2, P59594
653def pdb_to_uniprot (pdb_id : str) -> List[ Union[str, NoReferableException] ]:
654 # Set the request query
655 query = '''query ($id: String!) {
656 entry(entry_id: $id) {
657 polymer_entities {
658 rcsb_polymer_entity_container_identifiers { uniprot_ids }
659 rcsb_entity_source_organism { scientific_name }
660 entity_poly { pdbx_seq_one_letter_code }
661 }
662 }
663 }'''
664 # Request PDB data
665 parsed_response = request_pdb_data(pdb_id, query)
666 # Mine data
667 uniprot_ids = []
668 for polymer in parsed_response['polymer_entities']:
669 identifier = polymer['rcsb_polymer_entity_container_identifiers']
670 # Get the uniprot
671 uniprots = identifier.get('uniprot_ids', None)
672 # If there are not UniProts at all in this entity skip to the next
673 if not uniprots:
674 # However, it may happen the the polymer organism is set as synthetic construct
675 # In this case we keep the sequence and return a no referable exception
676 organisms = polymer.get('rcsb_entity_source_organism', None)
677 if not organisms: raise ValueError('Missing organism')
678 # Get scientific names in lower caps since sometimes they are all upper caps
679 scientific_names = set(
680 [ organism.get('scientific_name', '').lower() for organism in organisms ])
681 if 'synthetic construct' in scientific_names:
682 entity = polymer.get('entity_poly', None)
683 if not entity: raise ValueError(f'Missing entity in {pdb_id}')
684 sequence = entity.get('pdbx_seq_one_letter_code', None)
685 if not sequence: raise ValueError(f'Missing sequence in {pdb_id}')
686 uniprot_ids.append( NoReferableException(sequence) )
687 continue
688 # If we have multiple uniprots in a single entity then we skip them
689 # Note tha they belong to an entity which is no referable (e.g. a chimeric entity)
690 # See 5GY2 and 7E2Z labeled as chimeric entities and 6e67, which is not labeled likewise
691 if len(uniprots) > 1:
692 warn(f'Multiple UniProt ids in the same entity in {pdb_id} -> Is this a chimeric entity?')
693 entity = polymer.get('entity_poly', None)
694 if not entity: raise ValueError(f'Missing entity in {pdb_id}')
695 sequence = entity.get('pdbx_seq_one_letter_code', None)
696 if not sequence: raise ValueError(f'Missing sequence in {pdb_id}')
697 uniprot_ids.append( NoReferableException(sequence) )
698 continue
699 uniprot_id = uniprots[0]
700 uniprot_ids.append(uniprot_id)
701 actual_uniprot_ids = [ uniprot_id for uniprot_id in uniprot_ids if type(uniprot_id) == str ]
702 if len(actual_uniprot_ids) > 0:
703 print(f' UniProt ids for PDB id {pdb_id}: ' + ', '.join(actual_uniprot_ids))
704 else: print(f' No UniProt ids were found for PDB id {pdb_id}')
705 no_refs = [ uniprot_id for uniprot_id in uniprot_ids if type(uniprot_id) == NoReferableException ]
706 no_refs_count = len(no_refs)
707 if no_refs_count > 0:
708 print(f' Also encountered {no_refs_count} no refereable sequences for PDB id {pdb_id}')
709 return uniprot_ids
711# This function is used by the generate_metadata script
712# 1. Get structure sequences
713# 2. Calculate which reference domains are covered by the previous sequence
714# 3. In case it is a covid spike, align the previous sequence against all saved variants (they are in 'utils')
715from model_workflow.resources.covid_variants import covid_spike_variants
716def get_sequence_metadata (structure : 'Structure', protein_references_file : 'File', residue_map : dict) -> dict:
717 # Mine sequences from the structure
718 sequences = []
719 # Classify sequences according to if they belong to protein or nucleic sequences
720 # WARNING: We are interested in unique sequence BUT we also want to keep the order
721 # WARNING: Do NOT use sets here to the order of appearance in the structure is respected
722 protein_sequences = []
723 nucleic_sequences = []
724 # Iterate structure chains
725 for chain in structure.chains:
726 # Get the current chain sequence and add it to the list
727 sequence = chain.get_sequence()
728 sequences.append(sequence)
729 # Get the chain classification
730 classification = chain.get_classification()
731 # Depending on what it is, add the sequence also in the corresponding list
732 if classification == 'protein' and sequence not in protein_sequences:
733 protein_sequences.append(sequence)
734 elif (classification == 'dna' or classification == 'rna') and sequence not in nucleic_sequences:
735 nucleic_sequences.append(sequence)
736 # Get values from the residue map
737 # Get protein references from the residues map
738 reference_ids = []
739 references = residue_map['references']
740 if references and len(references) > 0:
741 for ref, ref_type in zip(references, residue_map['reference_types']):
742 if ref_type == 'protein':
743 reference_ids.append(ref)
744 residue_reference_numbers = residue_map['residue_reference_numbers']
745 residue_reference_indices = residue_map['residue_reference_indices']
746 # Load references data, which should already be save to the references data file
747 references_data = import_references(protein_references_file) if protein_references_file.exists else {}
748 # In case we have the SARS-CoV-2 spike among the references, check also which is the variant it belongs to
749 # DANI: Esto es un parche. En un futuro buscaremos una manera mejor de comprovar variantes en cualquier contexto
750 variant = None
751 covid_spike_reference_id = 'P0DTC2'
752 if covid_spike_reference_id in reference_ids:
753 # Load the reference data and find a chain which belongs to the spike
754 # We consider having only one variant, so all chains should return the same result
755 reference_data = references_data[covid_spike_reference_id]
756 covid_spike_reference_index = reference_ids.index(covid_spike_reference_id)
757 sample_residue_index = next((residue_index for residue_index, reference_index in enumerate(residue_reference_indices) if reference_index == covid_spike_reference_index), None)
758 if sample_residue_index == None:
759 raise SystemExit('Failed to find residue belonging to SARS-CoV-2 variant')
760 sample_chain_sequence = structure.residues[sample_residue_index].chain.get_sequence()
761 # Align the sample chain sequence against all variants to find the best match
762 highest_score = 0
763 print('Finding SARS-CoV-2 variant')
764 for variant_name, variant_sequence in covid_spike_variants.items():
765 print(f' Trying with {variant_name}')
766 align_results = align(variant_sequence, sample_chain_sequence)
767 if not align_results:
768 continue
769 mapping, score = align_results
770 if score > highest_score:
771 highest_score = score
772 variant = variant_name
773 # At this point there should be a result
774 if not variant:
775 raise SystemExit('Something went wrong trying to find the SARS-CoV-2 variant')
776 print(f'It is {variant}')
777 # Set which reference domains are present in the structure
778 domains = []
779 for reference_index, reference_id in enumerate(reference_ids):
780 # If this is not referable then there are no domains to mine
781 if reference_id == NO_REFERABLE_FLAG:
782 continue
783 # If the reference was not found then there are no domains to mine
784 if reference_id == NOT_FOUND_FLAG:
785 continue
786 # Get residue numbers of residues which belong to the current reference in th residue map
787 residue_numbers = []
788 for residue_index, residue_reference_index in enumerate(residue_reference_indices):
789 if residue_reference_index == reference_index:
790 residue_numbers.append(residue_reference_numbers[residue_index])
791 # Set a function to find if any of the residue numbers is within a given range
792 def in_range (start : int, end : int) -> bool:
793 for residue_number in residue_numbers:
794 if residue_number >= start and residue_number <= end:
795 return True
796 return False
797 # Get the reference data for the current reference uniprot id
798 reference_data = references_data.get(reference_id, None)
799 if not reference_data:
800 raise SystemExit(reference_id + ' is not in the references data file')
801 # Iterate over data domains
802 for domain in reference_data['domains']:
803 selection = domain['selection']
804 # Iterate over the residue ranges in the selection field
805 residue_ranges = selection.split(', ')
806 for residue_range in residue_ranges:
807 start, end = [ int(num) for num in residue_range.split('-') ]
808 # If this range includes any resiudes in the residue map then the domain is included in the list
809 if in_range(start, end):
810 domains.append(domain['name'])
811 break
812 # Return the sequence matadata
813 return {
814 'sequences': sequences,
815 'protein_sequences': protein_sequences,
816 'nucleic_sequences': nucleic_sequences,
817 'domains': domains,
818 'cv19_variant': variant
819 }