Coverage for mddb_workflow / tools / generate_map.py: 57%

493 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 18:45 +0000

1import sys 

2import json 

3import urllib.request 

4import numpy as np 

5import re 

6 

7from mddb_workflow.utils.auxiliar import protein_residue_name_to_letter, NoReferableException 

8from mddb_workflow.utils.auxiliar import InputError, warn, load_json, save_json, request_pdb_data 

9from mddb_workflow.utils.constants import REFERENCE_SEQUENCE_FLAG, NO_REFERABLE_FLAG, NOT_FOUND_FLAG 

10from mddb_workflow.utils.file import File 

11from mddb_workflow.utils.type_hints import * 

12 

13import xmltodict 

14 

15from Bio import Align 

16from Bio.Blast import NCBIWWW 

17from Bio.Align import substitution_matrices, Alignment 

18 

19# Set generic sequences which should be similar to known antibodies 

20REFERENCE_ANTIBODY_SEQUENCES = { 

21 # Single domain antibody 

22 # This sequence has been designed with the aid of DeepSeek 

23 # CDR regionds have been replaced by X 

24 'QVQLVESGGGLVQPGGSLRLSCAASXXXXXXXWYRQAPGKEREFVAXXXXXXRFTISRDNAKNTVYLQMNSLKPEDTAVYYCXXXXXXXXXXWGQGTQVTVSS' 

25} 

26 

27 

28# Save current stderr to further restore it 

29stderr_backup = sys.stderr 

30# Suppress stderr 

31#sys.stderr = None 

32# Import the function 

33#from Bio.SubsMat import MatrixInfo 

34# Restore stderr 

35#sys.stderr = stderr_backup 

36 

37# AGUS: Necesitamos que la secuencia de aa alineada tenga gaps identificados con guiones y pairwaise2 (biopython < 1.80) no lo hace 

38# AGUS: Para actualizar a biopython >= 1.80 pairwaise2 ya no existe y no podemos obtener el mismo outoput que necesitábamos 

39# AGUS: Esta función parece resolver el problema: https://github.com/biopython/biopython/issues/4769 

40# Set the aligner 

41aligner = Align.PairwiseAligner(mode='local') 

42aligner.substitution_matrix = substitution_matrices.load("BLOSUM62") 

43aligner.open_gap_score = -10 

44aligner.extend_gap_score = -0.5 

45def add_leading_and_trailing_gaps(alignment: Alignment) -> Alignment: 

46 coords = alignment.coordinates 

47 

48 # We need to add two columns on each side of the coordinates matrix. 

49 # The first column will always point to the start of the sequences, 

50 # hence filled with zeros. 

51 new_coords = np.zeros((2, coords.shape[1] + 4), dtype=int) 

52 

53 # The last w column will always point to the end of sequences, 

54 # hence filled with the length of the respective sequences. 

55 target_len = len(alignment.sequences[0]) 

56 query_len = len(alignment.sequences[1]) 

57 last_col = np.array([target_len, query_len]) 

58 new_coords[:, -1] = last_col 

59 

60 # The middle columns will contain the original coordinates. 

61 new_coords[:, 2:-2] = coords 

62 

63 # The second and one before last columns will point to the end of sequence, 

64 # with less unaligned positions. 

65 new_coords[:, 1] = coords[:, 0] - coords[:, 0].min() 

66 new_coords[:, -2] = coords[:, -1] + (last_col - coords[:, -1]).min() 

67 return Alignment(sequences=alignment.sequences, coordinates=new_coords) 

68 

69# Map the structure aminoacids sequences against the standard reference sequences 

70# References are uniprot accession ids and they are optional 

71# For each reference, align the reference sequence with the topology sequence 

72# Chains which do not match any reference sequence will be blasted 

73# Note that an internet connection is required both to retireve the uniprot reference sequence and to do the blast 

74# NEVER FORGET: This system relies on the fact that topology chains are not repeated 

75def generate_protein_mapping ( 

76 structure : 'Structure', 

77 protein_references_file : 'File', 

78 database : 'Database', 

79 cache : 'Cache', 

80 register : dict, 

81 mercy : list[str] = [], 

82 input_protein_references : list | dict = [], 

83 pdb_ids : list[str] = [], 

84) -> dict: 

85 """ Map the structure aminoacids sequences against the Uniprot reference sequences. """ 

86 # Remove previous warnings, if any 

87 register.remove_warnings(REFERENCE_SEQUENCE_FLAG) 

88 # Forced references must be list or dict 

89 # If it is none then we set it as an empty list 

90 if input_protein_references == None: 

91 input_protein_references = [] 

92 # If forced references is a list of dictionaries then it means the input is wrongly formatted 

93 # This may happen since the inputs file is in YAML format, and a simple hyphen makes the difference 

94 # We can fix it from here anyway 

95 if type(input_protein_references) == list and len(input_protein_references) > 0 and type(input_protein_references[0]) == dict: 

96 input_protein_references = { k: v for fr in input_protein_references for k, v in fr.items() } 

97 # Check if the forced references are strict (i.e. reference per chain, as a dictionary) or flexible (list of references) 

98 strict_references = type(input_protein_references) == dict 

99 # Check the "no referable" flag not to be passed when references are not strict 

100 if not strict_references and NO_REFERABLE_FLAG in input_protein_references: 

101 raise InputError(' The "no referable" flag cannot be passed in a list.' \ 

102 f' You must use a chain keys dictionary (e.g. {"A":"{NO_REFERABLE_FLAG}"})') 

103 # Store all the references which are got through this process 

104 # Note that not all references may be used at the end 

105 references = {} 

106 # Given a uniprot accession, get the reference object 

107 # Try first asking to the MDposit database in case the reference exists already 

108 # If not, retrieve UniProt data and build the reference object 

109 # Return also a boolean to set if the reference already existed (True) or not (False) 

110 def get_reference (uniprot_accession : str) -> tuple[dict, bool]: 

111 reference = references.get(uniprot_accession, None) 

112 if reference: 

113 return reference, True 

114 reference = database.get_reference_data('proteins', uniprot_accession) 

115 if reference: 

116 return reference, True 

117 reference = get_uniprot_reference(uniprot_accession) 

118 return reference, False 

119 # Import local references, in case the references json file already exists 

120 imported_references = None 

121 if protein_references_file.exists: 

122 imported_references = import_references(protein_references_file) 

123 # Append the imported references to the overall references pool 

124 for k,v in imported_references.items(): 

125 references[k] = v 

126 # Get the structure chain sequences 

127 parsed_chains = get_parsed_chains(structure) 

128 # Find out which chains are protein 

129 protein_parsed_chains = [] 

130 for chain_data in parsed_chains: 

131 sequence = chain_data['sequence'] 

132 if next((letter for letter in sequence if letter != 'X'), None): 

133 chain_data['match'] = { 'ref': None, 'map': None, 'score': 0 } 

134 protein_parsed_chains.append(chain_data) 

135 # If there are no protein sequences then there is no need to map anything 

136 if len(protein_parsed_chains) == 0: 

137 print(' There are no protein sequences') 

138 return protein_parsed_chains 

139 # For each input forced reference, get the reference sequence 

140 reference_sequences = {} 

141 # Save already tried alignments to not repeat the alignment further 

142 tried_alignments = { chain_data['name']: [] for chain_data in protein_parsed_chains } 

143 # Set a function to try to match all protein sequences with the available reference sequences 

144 # In case of match, objects in the 'protein_parsed_chains' list are modified by adding the result 

145 # Finally, return True if all protein sequences were matched with the available reference sequences or False if not 

146 def match_sequences () -> bool: 

147 # Track each chain-reference alignment match and keep the score of successful alignments 

148 # Now for each structure sequence, align all reference sequences and keep the best alignment (if it meets the minimum) 

149 for chain_data in protein_parsed_chains: 

150 chain = chain_data['name'] 

151 chain_tried_alignments = tried_alignments[chain] 

152 # In case references are forced per chain check if there is a reference for this chain and match according to this 

153 if strict_references: 

154 # Get the forced specific chain for this sequence, if any 

155 forced_reference = input_protein_references.get(chain, None) 

156 if forced_reference: 

157 # If the chain has a specific forced reference then we must align it just once 

158 # Skip this process in further matches 

159 if chain_data['match']['ref']: 

160 continue 

161 # In case the forced reference is the "no referable" flag 

162 # Thus it has no reference sequence and we must not try to match it 

163 # Actually, any match would be accidental and not correct 

164 if forced_reference == NO_REFERABLE_FLAG: 

165 chain_data['match'] = { 'ref': NO_REFERABLE_FLAG } 

166 continue 

167 # In case the forced reference is the "not found" flag 

168 # This should not happend but we may be using references as forced references, so just in case 

169 if forced_reference == NOT_FOUND_FLAG: 

170 chain_data['match'] = { 'ref': NOT_FOUND_FLAG } 

171 continue 

172 # Get the forced reference sequence and align it to the chain sequence in order to build the map 

173 reference_sequence = reference_sequences[forced_reference] 

174 print(f' Aligning chain {chain} with {forced_reference} reference sequence') 

175 align_results = align(reference_sequence, chain_data['sequence']) 

176 # The align must match or we stop here and warn the user 

177 if not align_results: 

178 raise InputError(f'Forced reference {chain} -> {forced_reference} does not match in sequence') 

179 sequence_map, align_score = align_results 

180 reference = references[forced_reference] 

181 chain_data['match'] = { 'ref': reference, 'map': sequence_map, 'score': align_score } 

182 continue 

183 # If the chain has already a match and this match is among the forced references then stop here 

184 # Forced references have priority and this avoids having a match with a not forced reference further 

185 # Same behaviour if the match is with an unreferable sequence 

186 if chain_data['match']['ref'] and ( chain_data['match']['ref'] == NO_REFERABLE_FLAG 

187 or chain_data['match']['ref'] == NOT_FOUND_FLAG 

188 or chain_data['match']['ref']['uniprot'] in input_protein_references): 

189 continue 

190 # Iterate over the different available reference sequences 

191 for uniprot_id, reference_sequence in reference_sequences.items(): 

192 # If this alignment has been tried already then skip it 

193 if uniprot_id in chain_tried_alignments: 

194 continue 

195 # Align the structure sequence with the reference sequence 

196 # NEVER FORGET: This system relies on the fact that topology chains are not repeated 

197 print(f' Aligning chain {chain} with {uniprot_id} reference sequence') 

198 align_results = align(reference_sequence, chain_data['sequence']) 

199 tried_alignments[chain].append(uniprot_id) # Save the alignment try, no matter if it works or not 

200 if not align_results: 

201 continue 

202 # In case we have a valid alignment, check the alignment score is better than the current reference score (if any) 

203 sequence_map, align_score = align_results 

204 current_reference = chain_data['match'] 

205 if current_reference['score'] > align_score: 

206 continue 

207 # If the match is a 'no referable' exception then set a no referable flag 

208 if type(uniprot_id) == NoReferableException: 

209 chain_data['match'] = { 'ref': NO_REFERABLE_FLAG } 

210 continue 

211 # Proceed to set the corresponding reference toherwise 

212 reference = references[uniprot_id] 

213 # If the alignment is better then we impose the new reference 

214 chain_data['match'] = { 'ref': reference, 'map': sequence_map, 'score': align_score } 

215 # Sum up the current matching 

216 print(' Protein reference summary:') 

217 for chain_data in parsed_chains: 

218 name = chain_data['name'] 

219 match = chain_data.get('match', None) 

220 if not match: 

221 print(f' {name} -> Not protein') 

222 continue 

223 reference = chain_data['match'].get('ref', None) 

224 if not reference: 

225 print(f' {name} -> ¿?') 

226 continue 

227 if reference == NO_REFERABLE_FLAG: 

228 print(f' {name} -> No referable') 

229 continue 

230 if reference == NOT_FOUND_FLAG: 

231 print(f' {name} -> Not found') 

232 continue 

233 uniprot_id = reference['uniprot'] 

234 print(f' {name} -> {uniprot_id}') 

235 # Export already matched references 

236 export_references(protein_parsed_chains, protein_references_file) 

237 # Finally, return True if all protein sequences were matched with the available reference sequences or False if not 

238 allright = all([ chain_data['match']['ref'] for chain_data in protein_parsed_chains ]) 

239 # If we match all chains then make sure there is no forced reference missing which did not match 

240 # Otherwise stop here and force the user to remove these forced uniprot ids from the inputs file 

241 # WARNING: Although we could move on it is better to stop here and warn the user to prevent a future silent problem 

242 if allright and input_protein_references: 

243 # Get forced uniprot ids 

244 forced_uniprot_ids = set(list(input_protein_references.values()) if strict_references else input_protein_references) 

245 forced_uniprot_ids -= { NOT_FOUND_FLAG, NO_REFERABLE_FLAG } 

246 #forced_uniprot_ids.remove(NO_REFERABLE_FLAG) 

247 #forced_uniprot_ids.remove(NOT_FOUND_FLAG) 

248 # Get matched uniprot ids 

249 matched_references = [ chain_data['match']['ref'] for chain_data in protein_parsed_chains ] 

250 matched_uniprot_ids = set([ ref['uniprot'] for ref in matched_references if type(ref) == dict ]) 

251 # Check the difference 

252 unmatched_uniprot_ids = forced_uniprot_ids - matched_uniprot_ids 

253 if len(unmatched_uniprot_ids) > 0: 

254 log = ', '.join(unmatched_uniprot_ids) 

255 raise InputError(f'Some forced references were not matched with any protein sequence: {log}\n' 

256 ' Please remove them from the inputs file') 

257 return allright 

258 # --- End of match_sequences function -------------------------------------------------------------------------------- 

259 # First use the forced references for the matching 

260 if input_protein_references: 

261 forced_uniprot_ids = list(input_protein_references.values()) if strict_references else input_protein_references 

262 for uniprot_id in forced_uniprot_ids: 

263 # If instead of a uniprot id there is a 'no referable' flag then we skip this process 

264 if uniprot_id == NO_REFERABLE_FLAG: 

265 continue 

266 # If instead of a uniprot id there is a 'not found' flag then we skip this process 

267 # This should not happend but we may be using references as forced references, so just in case 

268 if uniprot_id == NOT_FOUND_FLAG: 

269 continue 

270 # If reference is already in the list (i.e. it has been imported) then skip this process 

271 reference = references.get(uniprot_id, None) 

272 if reference: 

273 reference_sequences[uniprot_id] = reference['sequence'] 

274 continue 

275 # Find the reference data for the given uniprot id 

276 reference, already_loaded = get_reference(uniprot_id) 

277 reference_sequences[uniprot_id] = reference['sequence'] 

278 # Save the current whole reference object for later 

279 references[reference['uniprot']] = reference 

280 # Now that we have all forced references data perform the matching 

281 # If we have every protein chain matched with a reference then we stop here 

282 print(' Using only forced references from the inputs file') 

283 if match_sequences(): 

284 return protein_parsed_chains 

285 # Now add the imported references to reference sequences. Thus now they will be 'matchable' 

286 # Thus now they will be 'matchable', so try to match sequences again in case any of the imported references has not been tried 

287 # It was not done before since we want forced references to have priority 

288 if imported_references: 

289 need_rematch = False 

290 for uniprot_id, reference in imported_references.items(): 

291 # If the imported reference has been aligned already (i.e. it was a forced reference) 

292 if uniprot_id in reference_sequences: 

293 continue 

294 # Otherwise, include it 

295 need_rematch = True 

296 reference_sequences[uniprot_id] = reference['sequence'] 

297 # If there was at least one imported reference missing then rerun the matching 

298 if need_rematch: 

299 print(' Using also references imported from references.json') 

300 if match_sequences(): 

301 return protein_parsed_chains 

302 # If there are still any chain which is not matched with a reference then we need more references 

303 # To get them, retrieve all uniprot ids associated to the pdb ids, if any 

304 if pdb_ids and len(pdb_ids) > 0: 

305 for pdb_id in pdb_ids: 

306 # Ask PDB 

307 uniprot_ids = pdb_to_uniprot(pdb_id) 

308 for uniprot_id in uniprot_ids: 

309 # If this is not an actual UniProt, but a no referable exception, then handle it 

310 # We must find the matching sequence and set the corresponding chain as no referable 

311 if type(uniprot_id) == NoReferableException: 

312 reference_sequences[uniprot_id] = uniprot_id.sequence 

313 continue 

314 # Build a new reference from the resulting uniprot 

315 reference, already_loaded = get_reference(uniprot_id) 

316 if reference == None: 

317 continue 

318 reference_sequences[reference['uniprot']] = reference['sequence'] 

319 # Save the current whole reference object for later 

320 references[reference['uniprot']] = reference 

321 # If we have every protein chain matched with a reference already then we stop here 

322 print(' Using also references related to PDB ids from the inputs file') 

323 if match_sequences(): 

324 return protein_parsed_chains 

325 # If there are still any chain which is not matched with a reference then we need more references 

326 # To get them, we run a blast with each orphan chain sequence 

327 for chain_data in protein_parsed_chains: 

328 # Skip already references chains 

329 if chain_data['match']['ref']: 

330 continue 

331 # Run the blast 

332 sequence = chain_data['sequence'] 

333 uniprot_id = blast(sequence, cache) 

334 if not uniprot_id: 

335 chain_data['match'] = { 'ref': NOT_FOUND_FLAG } 

336 continue 

337 # Build a new reference from the resulting uniprot 

338 reference, already_loaded = get_reference(uniprot_id) 

339 reference_sequences[reference['uniprot']] = reference['sequence'] 

340 # Save the current whole reference object for later 

341 references[reference['uniprot']] = reference 

342 # If we have every protein chain matched with a reference already then we stop here 

343 print(' Using also references from blast') 

344 if match_sequences(): 

345 return protein_parsed_chains 

346 # At this point we should have macthed all sequences 

347 # If not, kill the process unless mercy was given 

348 must_be_killed = REFERENCE_SEQUENCE_FLAG not in mercy 

349 if must_be_killed: 

350 raise InputError('BLAST failed to find a matching reference sequence for at least one protein sequence. See the warnings above.\n' + \ 

351 ' If your system has antibodies or synthetic constructs please consider marking these chains as "no referable" in the inputs file.\n' + \ 

352 ' If your system has exotic proteins whose sequences are not found in the Swiss-Prot database you may force non-curated UniProt ids.\n' + \ 

353 ' If your system has very exotic proteins whose sequence are not in UniProt you can use the "--mercy refseq" flag to skip this error.') 

354 warn('BLAST failed to find a matching reference sequence for at least one protein sequence') 

355 register.add_warning(REFERENCE_SEQUENCE_FLAG, 'There is at least one protein region which is not mapped to any reference sequence') 

356 return protein_parsed_chains 

357 

358# Export reference objects data to a json file 

359# This file is used by the loader to load new references to the database 

360# Note that all references are saved to this json file, even those which are already in the database 

361# It is the loader who is the responsible to check which references must be loaded and which ones are loaded already 

362# Note that mapping data (i.e. which residue belongs to each reference) is not saved 

363def export_references (mapping_data : list, protein_references_file : 'File'): 

364 final_references = [] 

365 final_uniprots = [] 

366 for data in mapping_data: 

367 match = data['match'] 

368 ref = match['ref'] 

369 if not ref or ref == NO_REFERABLE_FLAG or ref == NOT_FOUND_FLAG: 

370 continue 

371 uniprot = ref['uniprot'] 

372 if uniprot in final_uniprots: 

373 continue 

374 final_references.append(ref) 

375 final_uniprots.append(uniprot) 

376 # If there are no references to exported then do not egenrate the json file 

377 if len(final_references) == 0: 

378 return 

379 # Write references to a json file 

380 save_json(final_references, protein_references_file.path, indent = 4) 

381 

382# Import reference json file so we do not have to rerun this process 

383def import_references (protein_references_file : 'File') -> list: 

384 print(' Importing references from ' + protein_references_file.path) 

385 # Read the file 

386 file_content = load_json(protein_references_file.path) 

387 # Format data as the process expects to find it 

388 references = {} 

389 for reference in file_content: 

390 uniprot = reference['uniprot'] 

391 references[uniprot] = reference 

392 return references 

393 

394# Get each chain name and aminoacids sequence in a topology 

395# Output format example: [ { 'sequence': 'VNLTT', 'indices': [1, 2, 3, 4, 5] }, ... ] 

396def get_parsed_chains (structure : 'Structure') -> list: 

397 parsed_chains = [] 

398 chains = structure.chains 

399 for chain in chains: 

400 name = chain.name 

401 sequence = '' 

402 residue_indices = [] 

403 for residue in chain.residues: 

404 letter = protein_residue_name_to_letter(residue.name) 

405 sequence += letter 

406 residue_indices.append(residue.index) 

407 sequence_object = { 'name': name, 'sequence': sequence, 'residue_indices': residue_indices } 

408 parsed_chains.append(sequence_object) 

409 return parsed_chains 

410 

411# Align two aminoacid sequences 

412# Return a list with the reference residue indexes (values) 

413# which match each new sequence residues indexes (indexes) 

414# Return also the score of the alignment 

415# Return None when there is not valid alignment at all 

416# Set verbose = True to see a visual summary of the sequence alignments in the logs 

417def align (ref_sequence : str, new_sequence : str, verbose : bool = False) -> Optional[ tuple[list, float] ]: 

418 

419 #print('- REFERENCE\n' + ref_sequence + '\n- NEW\n' + new_sequence) 

420 

421 # If the new sequence is all 'X' stop here, since this would make the alignment infinite 

422 # Then an array filled with None is returned 

423 if all([ letter == 'X' for letter in new_sequence ]): 

424 return None 

425 

426 # Return the new sequence as best aligned as possible with the reference sequence 

427 #alignments = pairwise2.align.localds(ref_sequence, new_sequence, MatrixInfo.blosum62, -10, -0.5) 

428 # DANI: Habría que hacerlo de esta otra forma según el deprecation warning (arriba hay más código) 

429 # DANI: El problema es que el output lo tiene todo menos la sequencia en formato alienada 

430 # DANI: i.e. formato '----VNLTT' (con los gaps identificados con guiones), que es justo el que necesito 

431 # Actualized code for biopython >= 1.80 

432 alignments = aligner.align(ref_sequence, new_sequence) 

433 

434 # In case there are no alignments it means the current chain has nothing to do with this reference 

435 # Then an array filled with None is returned 

436 if len(alignments) == 0: 

437 return None 

438 

439 # Several alignments may be returned, specially when it is a difficult or impossible alignment 

440 # Output format example: '----VNLTT' 

441 best_alignment = alignments[0] 

442 best_alignment_with_gaps = add_leading_and_trailing_gaps(best_alignment) 

443 # Extract the allignment sequences 

444 #aligned_ref, aligned_sequence = best_alignment_with_gaps.sequences 

445 aligned_ref = best_alignment_with_gaps[0] # AGUS: Puede ser útil en el futuro 

446 aligned_sequence = best_alignment_with_gaps[1] 

447 # Obtain the score of the alignment 

448 score = best_alignment.score 

449 # Calculate the normalized score 

450 normalized_score = score / len(new_sequence) 

451 

452 # If the normalized score does not reaches the minimum we consider the alignment is not valid 

453 # It may happen when the reference goes for a specific chain but we must map all chains 

454 # This 1 has been found experimentally 

455 # Non maching sequence may return a 0.1-0.3 normalized score 

456 # Matching sequence may return >4 normalized score 

457 if normalized_score < 2: 

458 print(' Not valid alignment') 

459 return None 

460 

461 # Tell the user about the success 

462 beautiful_normalized_score = round(normalized_score * 100) / 100 

463 print(f' Valid alignment -> Normalized score = {beautiful_normalized_score}') 

464 

465 # Match each residue 

466 aligned_mapping = [] 

467 aligned_index = 0 

468 for l, letter in enumerate(aligned_sequence): 

469 # Guions are skipped 

470 if letter == '-': 

471 continue 

472 # Get the current residue of the new sequence 

473 equivalent_letter = new_sequence[aligned_index] 

474 if not letter == equivalent_letter: 

475 raise RuntimeError(f'Something was wrong at position {l} :S') 

476 # 'X' residues cannot be mapped since reference sequences should never have any 'X' 

477 if letter == 'X': 

478 aligned_mapping.append(None) 

479 aligned_index += 1 

480 continue 

481 # Otherwise add the equivalent aligned index to the mapping 

482 # WARNING: Add +1 since uniprot residue counts start at 1, not 0 

483 aligned_mapping.append(l + 1) 

484 aligned_index += 1 

485 

486 return aligned_mapping, normalized_score 

487 

488# Given an aminoacids sequence, return a list of uniprot ids 

489# Note that we are blasting against UniProtKB / Swiss-Prot so results will always be valid UniProt accessions 

490# WARNING: This always means results will correspond to curated entries only 

491# If your sequence is from an exotic organism the result may be not from it but from other more studied organism 

492# Since this function may take some time we always cache the result 

493def blast (sequence : str, cache : Optional['Cache'] = None) -> Optional[str]: 

494 # Check if we have the resulted saved in cache already 

495 if cache != None: 

496 cached_blasts = cache.retrieve('blasts', {}) 

497 if sequence in cached_blasts: 

498 accession = cached_blasts[sequence] 

499 print(f'Using previous blast result from cache: {accession}') 

500 return accession 

501 print('Throwing blast...') 

502 result = NCBIWWW.qblast( 

503 program = "blastp", 

504 database = "swissprot", # UniProtKB / Swiss-Prot 

505 sequence = sequence, 

506 ) 

507 parsed_result = xmltodict.parse(result.read()) 

508 hits = parsed_result['BlastOutput']['BlastOutput_iterations']['Iteration']['Iteration_hits'] 

509 # When there is no result return None 

510 # Note that this is possible although hardly unprobable 

511 if not hits: 

512 return None 

513 # Get the first result only 

514 # Note that when there is only one result the Hit isnot an list, but the hit itself 

515 results = hits['Hit'] 

516 if type(results) == list: first_result = results[0] 

517 elif type(results) == dict: first_result = results 

518 else: raise RuntimeError('Invalid hit format') 

519 # Return the accession 

520 # DANI: Si algun día tienes problemas porque te falta el '.1' al final del accession puedes sacarlo de Hit_id 

521 accession = first_result['Hit_accession'] 

522 print('Result: ' + accession) 

523 # Save the result in the cache 

524 if cache != None: 

525 cached_blasts[sequence] = accession 

526 cached_blasts = cache.update('blasts', cached_blasts) 

527 return accession 

528 

529# Given a uniprot accession, use the uniprot API to request its data and then mine what is needed for the database 

530def get_uniprot_reference (uniprot_accession : str) -> Optional[dict]: 

531 # Request Uniprot 

532 request_url = 'https://www.ebi.ac.uk/proteins/api/proteins/' + uniprot_accession 

533 parsed_response = None 

534 request = urllib.request.Request(request_url) 

535 # One day the API was returning a 'Secure Connection Failed' error and this header fixed the problem 

536 request.add_header('Referer', 'https://www.uniprot.org/') 

537 try: 

538 with urllib.request.urlopen(request) as response: 

539 parsed_response = json.loads(response.read().decode("utf-8")) 

540 # If the accession is not found in UniProt then the id is not valid 

541 except urllib.error.HTTPError as error: 

542 if error.code == 404: 

543 warn('Cannot find UniProt entry for accession ' + uniprot_accession) 

544 return None 

545 print('Error when requesting ' + request_url) 

546 raise RuntimeError(f'Something went wrong with the Uniprot request (error {error.code})') 

547 except: 

548 print('Error when requesting ' + request_url) 

549 raise RuntimeError(f'Something went very wrong with the Uniprot request') 

550 # If we have not a response at this point then it may mean we are trying to access an obsolete entry (e.g. P01607) 

551 if parsed_response == None: 

552 warn('Cannot find UniProt entry for accession ' + uniprot_accession) 

553 return None 

554 # Get the full protein name 

555 protein_data = parsed_response['protein'] 

556 protein_name_data = protein_data.get('recommendedName', None) 

557 # DANI: It is possible that the 'recommendedName' is missing if it is not a reviewed UniProt entry 

558 if not protein_name_data: 

559 warn(f'The UniProt accession {uniprot_accession} is missing the recommended name. You should consider changing the reference.') 

560 protein_name_data = protein_data.get('submittedName', None)[0] 

561 if not protein_name_data: 

562 raise ValueError('Unexpected structure in UniProt response for accession ' + uniprot_accession) 

563 protein_name = protein_name_data['fullName']['value'] 

564 # Get the gene names as a single string 

565 gene_names = [] 

566 # WARNING: Some uniprot entries are missing gene names (e.g. P00718) 

567 # WARNING: Some uniprot entries have names in non-canonical keys (e.g. orfNames, olnNames) 

568 # These names are not shown event in the uniprot web page so we also skip them 

569 genes = parsed_response.get('gene', []) 

570 for gene in genes: 

571 gene_name = gene.get('name', None) 

572 if not gene_name: 

573 continue 

574 gene_names.append(gene_name['value']) 

575 gene_names = ', '.join(gene_names) if len(gene_names) > 0 else None 

576 # Get the organism name 

577 organism = parsed_response['organism']['names'][0]['value'] 

578 # Get the aminoacids sequence 

579 sequence = parsed_response['sequence']['sequence'] 

580 # Get interesting regions to be highlighted in the client 

581 domains = [] 

582 # WARNING: Some uniprot entries are missing features (e.g. O27908) 

583 features = parsed_response.get('features', []) 

584 # Get comments data which may be useful to further build domain descriptions 

585 comments_data = parsed_response.get('comments', None) 

586 # There are many types of features but in this case we will focus on domanins and similar 

587 target_types = ['CHAIN', 'REGION', 'DOMAIN', 'MOTIF', 'SITE'] 

588 for feature in features: 

589 # Skip features of other types 

590 if feature['type'] not in target_types: 

591 continue 

592 # Get the domain name 

593 name = feature['description'] 

594 # Build the domain description from the coments data 

595 description = None 

596 if comments_data: 

597 comments = [ comment for comment in parsed_response['comments'] if name == comment.get('molecule', None) ] 

598 comment_text = [] 

599 for comment in comments: 

600 if not comment.get('text', False): continue 

601 text = comment.get('text', None) 

602 if text == None: raise ValueError('Unexpected UniProt response format: no text in comment') 

603 # DANI: el comment 'text' casi siempre es una lista 

604 # DANI: solo tengo constancia de una vez en que era un string directamente 

605 # DANI: en uno de los comentarios de https://www.ebi.ac.uk/proteins/api/proteins/Q15465 

606 if type(text) == str: comment_text.append(text) 

607 elif type(text) == list: comment_text.append(text[0]['value']) 

608 else: raise ValueError('Unexpected UniProt response format: text in comment is neither str or list') 

609 description = '\n\n'.join(comment_text) 

610 # Set the domain selection 

611 # The domain 'begin' and 'end' values may include non-numeric symbols such as '~', '>' or '<' 

612 # These values are usually ignored or replaced by '?' in the UniProt web client 

613 # There may be not numeric value at all (e.g. Q99497) 

614 # In these cases uniprot shows the domain but it does not allow to use its functionallities 

615 # e.g. you can not blast using the domain sequence 

616 # It makes not sense having a domain with unkown range to me so we skip these domains 

617 begin = feature['begin'].replace('~', '').replace('>', '').replace('<', '') 

618 end = feature['end'].replace('~', '').replace('>', '').replace('<', '') 

619 if begin == '' or end == '': 

620 continue 

621 selection = begin + '-' + end 

622 # If we already have a domain with the same name then join both domains 

623 # For instance, you may have several repetitions of the 'Disordered' region 

624 already_existing_domain = next((domain for domain in domains if domain['name'] == name), None) 

625 if already_existing_domain: 

626 already_existing_domain['selection'] += ', ' + selection 

627 continue 

628 # Otherwise, create a new domain 

629 domain = { 

630 'name': name, 

631 'selection': selection 

632 } 

633 # Add a description only if we succesfully mined it 

634 # Note that only features tagged as CHAIN have comments (not sure about this) 

635 if description: 

636 domain['description'] = description 

637 # Add the new domain to the list 

638 domains.append(domain) 

639 # Mine protein functions from Gene Ontology references 

640 # Get database references 

641 db_references = parsed_response['dbReferences'] 

642 # Get references from Gene Ontology (GO) only 

643 go_references = [ ref for ref in db_references if ref['type'] == 'GO' ] 

644 # A Gene Ontology entry may be one of three types: 

645 # Cellular Component (C), Molecular Function (F) and Biological Process (P) 

646 # In this case we are interested in protein function only so we will keep those with the 'F' header 

647 functions = [ ref['properties']['term'][2:] for ref in go_references if ref['properties']['term'][0:2] == 'F:' ] 

648 # Set the final reference to be uploaded to the database 

649 return { 

650 'name': protein_name, 

651 'gene': gene_names, 

652 'organism': organism, 

653 'uniprot': uniprot_accession, 

654 'sequence': sequence, 

655 'domains': domains, 

656 'functions': functions 

657 } 

658 

659# Given a pdb Id, get its uniprot id 

660# e.g. 6VW1 -> Q9BYF1, P0DTC2, P59594 

661def pdb_to_uniprot (pdb_id : str) -> list[ str | NoReferableException ]: 

662 # Set the request query 

663 query = '''query ($id: String!) { 

664 entry(entry_id: $id) { 

665 polymer_entities { 

666 rcsb_polymer_entity_container_identifiers { uniprot_ids } 

667 rcsb_entity_source_organism { scientific_name } 

668 entity_poly { pdbx_seq_one_letter_code } 

669 } 

670 } 

671 }''' 

672 # Request PDB data 

673 parsed_response = request_pdb_data(pdb_id, query) 

674 # Mine data 

675 uniprot_ids = [] 

676 # WARNING: Polymers do not come in the same order than in PDB 

677 # WARNING: You will not know the entity number by enumerating them 

678 for polymer in parsed_response['polymer_entities']: 

679 # Get the aminoacids sequence of this polymer (or chain) 

680 entity = polymer.get('entity_poly', None) 

681 sequence = entity.get('pdbx_seq_one_letter_code', None) if entity else None 

682 if sequence: 

683 # WARNING: some polymers/chains may have a "special" sequence 

684 # It may combine one-letter code with 3-letter code in parenthesis for special aminoacids 

685 # e.g. 5JMO, entity 3 -> (DKA)RVK(AR7)(0QE) 

686 # e.g. 6ME2, entity 1 -> ... DRYLYI(YCM)HSLKYD ... 

687 # e.g. nucleic acids -> (DC)(DA)(DA)(DC)(DC)(DG)(DC)(DA)(DA)(DC) 

688 # We simply replace these special aminoacids by X 

689 sequence = re.sub(r'\([0-9A-Z]{2,3}\)', 'X', sequence) 

690 # Get the uniprot ids associated to this polymer (or chain) 

691 identifier = polymer['rcsb_polymer_entity_container_identifiers'] 

692 uniprots = identifier.get('uniprot_ids', None) 

693 # If there are not UniProt ids in this entity then it may be no referable 

694 # If we have a no referable entity then we must return an exception with its sequence 

695 # Beware that nucleic acids also fall in this section 

696 if not uniprots: 

697 # If this polymer, whatever it is, has not sequence then there is nothing we can do 

698 if not sequence: continue 

699 # Nueclic acid sequences should be all X at this point 

700 if all(letter == 'X' for letter in sequence): continue 

701 # Get the organisms 

702 organisms = polymer.get('rcsb_entity_source_organism', None) 

703 # Some synthetic constructs may have not defined organisms at all 

704 # e.g. 3H11, entity 3 

705 if not organisms: 

706 uniprot_ids.append( NoReferableException(sequence) ) 

707 continue 

708 # Get scientific names in lower caps since sometimes they are all upper caps 

709 scientific_names = set( 

710 [ (organism.get('scientific_name') or '').lower() for organism in organisms ]) 

711 # If we have a synthetic construct then flag the sequence as no referable 

712 if 'synthetic construct' in scientific_names: 

713 uniprot_ids.append( NoReferableException(sequence) ) 

714 continue 

715 # Check if the sequence of this chain may belong to an antibody 

716 print(' Could this be an antibody?') 

717 is_antibody = False 

718 for reference_sequence in REFERENCE_ANTIBODY_SEQUENCES: 

719 if align(reference_sequence, sequence): 

720 is_antibody = True 

721 break 

722 print(f' I guess it {"is" if is_antibody else "is not"} an antibody') 

723 # If so, the also set this chain as no referable since antibodies have no UniProt id 

724 if is_antibody: 

725 uniprot_ids.append( NoReferableException(sequence) ) 

726 continue 

727 # If we did not fall in any of the previous sections then continue 

728 continue 

729 # If we have multiple uniprots in a single entity then we skip them 

730 # Note tha they belong to an entity which is no referable (e.g. a chimeric entity) 

731 # See 5GY2 and 7E2Z labeled as chimeric entities and 6e67, which is not labeled likewise 

732 if len(uniprots) > 1: 

733 warn(f'Multiple UniProt ids in the same entity in {pdb_id} -> Is this a chimeric entity?') 

734 if not sequence: raise ValueError(f'Missing sequence with multiple UniProt ids in {pdb_id}') 

735 uniprot_ids.append( NoReferableException(sequence) ) 

736 continue 

737 # Normally, a polymer/chain will have one UniProt id only 

738 uniprot_id = uniprots[0] 

739 uniprot_ids.append(uniprot_id) 

740 # Count how many UniProt ids we found 

741 actual_uniprot_ids = [ uniprot_id for uniprot_id in uniprot_ids if type(uniprot_id) == str ] 

742 if len(actual_uniprot_ids) > 0: 

743 print(f' UniProt ids for PDB id {pdb_id}: ' + ', '.join(actual_uniprot_ids)) 

744 else: print(f' No UniProt ids were found for PDB id {pdb_id}') 

745 # Count how many no referable sequences we found 

746 no_refs = [ uniprot_id for uniprot_id in uniprot_ids if type(uniprot_id) == NoReferableException ] 

747 no_refs_count = len(no_refs) 

748 if no_refs_count > 0: 

749 print(f' Also encountered {no_refs_count} no refereable sequences for PDB id {pdb_id}') 

750 # Return them all 

751 return uniprot_ids 

752 

753# This function is used by the generate_metadata script 

754# 1. Get structure sequences 

755# 2. Calculate which reference domains are covered by the previous sequence 

756# 3. In case it is a covid spike, align the previous sequence against all saved variants (they are in 'utils') 

757from mddb_workflow.resources.covid_variants import covid_spike_variants 

758def get_sequence_metadata (structure : 'Structure', protein_references_file : 'File', residue_map : dict) -> dict: 

759 # Mine sequences from the structure 

760 sequences = [] 

761 # Classify sequences according to if they belong to protein or nucleic sequences 

762 # WARNING: We are interested in unique sequence BUT we also want to keep the order 

763 # WARNING: Do NOT use sets here to the order of appearance in the structure is respected 

764 protein_sequences = [] 

765 nucleic_sequences = [] 

766 # Iterate structure chains 

767 for chain in structure.chains: 

768 # Get the current chain sequence and add it to the list 

769 sequence = chain.get_sequence() 

770 sequences.append(sequence) 

771 # Get the chain classification 

772 classification = chain.get_classification() 

773 # Depending on what it is, add the sequence also in the corresponding list 

774 if classification == 'protein' and sequence not in protein_sequences: 

775 protein_sequences.append(sequence) 

776 elif (classification == 'dna' or classification == 'rna') and sequence not in nucleic_sequences: 

777 nucleic_sequences.append(sequence) 

778 # Get values from the residue map 

779 # Get protein references from the residues map 

780 reference_ids = [] 

781 references = residue_map['references'] 

782 if references and len(references) > 0: 

783 for ref, ref_type in zip(references, residue_map['reference_types']): 

784 if ref_type == 'protein': 

785 reference_ids.append(ref) 

786 residue_reference_numbers = residue_map['residue_reference_numbers'] 

787 residue_reference_indices = residue_map['residue_reference_indices'] 

788 # Load references data, which should already be save to the references data file 

789 references_data = import_references(protein_references_file) if protein_references_file.exists else {} 

790 # In case we have the SARS-CoV-2 spike among the references, check also which is the variant it belongs to 

791 # DANI: Esto es un parche. En un futuro buscaremos una manera mejor de comprovar variantes en cualquier contexto 

792 variant = None 

793 covid_spike_reference_id = 'P0DTC2' 

794 if covid_spike_reference_id in reference_ids: 

795 # Load the reference data and find a chain which belongs to the spike 

796 # We consider having only one variant, so all chains should return the same result 

797 reference_data = references_data[covid_spike_reference_id] 

798 covid_spike_reference_index = reference_ids.index(covid_spike_reference_id) 

799 sample_residue_index = next((residue_index for residue_index, reference_index in enumerate(residue_reference_indices) if reference_index == covid_spike_reference_index), None) 

800 if sample_residue_index == None: 

801 raise RuntimeError('Failed to find residue belonging to SARS-CoV-2 variant') 

802 sample_chain_sequence = structure.residues[sample_residue_index].chain.get_sequence() 

803 # Align the sample chain sequence against all variants to find the best match 

804 highest_score = 0 

805 print('Finding SARS-CoV-2 variant') 

806 for variant_name, variant_sequence in covid_spike_variants.items(): 

807 print(f' Trying with {variant_name}') 

808 align_results = align(variant_sequence, sample_chain_sequence) 

809 if not align_results: 

810 continue 

811 mapping, score = align_results 

812 if score > highest_score: 

813 highest_score = score 

814 variant = variant_name 

815 # At this point there should be a result 

816 if not variant: 

817 raise RuntimeError('Something went wrong trying to find the SARS-CoV-2 variant') 

818 print(f'It is {variant}') 

819 # Set which reference domains are present in the structure 

820 domains = [] 

821 for reference_index, reference_id in enumerate(reference_ids): 

822 # If this is not referable then there are no domains to mine 

823 if reference_id == NO_REFERABLE_FLAG: 

824 continue 

825 # If the reference was not found then there are no domains to mine 

826 if reference_id == NOT_FOUND_FLAG: 

827 continue 

828 # Get residue numbers of residues which belong to the current reference in th residue map 

829 residue_numbers = [] 

830 for residue_index, residue_reference_index in enumerate(residue_reference_indices): 

831 if residue_reference_index == reference_index: 

832 residue_numbers.append(residue_reference_numbers[residue_index]) 

833 # Set a function to find if any of the residue numbers is within a given range 

834 def in_range (start : int, end : int) -> bool: 

835 for residue_number in residue_numbers: 

836 if residue_number >= start and residue_number <= end: 

837 return True 

838 return False 

839 # Get the reference data for the current reference uniprot id 

840 reference_data = references_data.get(reference_id, None) 

841 if not reference_data: 

842 raise RuntimeError(reference_id + ' is not in the references data file') 

843 # Iterate over data domains 

844 for domain in reference_data['domains']: 

845 selection = domain['selection'] 

846 # Iterate over the residue ranges in the selection field 

847 residue_ranges = selection.split(', ') 

848 for residue_range in residue_ranges: 

849 start, end = [ int(num) for num in residue_range.split('-') ] 

850 # If this range includes any resiudes in the residue map then the domain is included in the list 

851 if in_range(start, end): 

852 domains.append(domain['name']) 

853 break 

854 # Return the sequence matadata 

855 return { 

856 'sequences': sequences, 

857 'protein_sequences': protein_sequences, 

858 'nucleic_sequences': nucleic_sequences, 

859 'domains': domains, 

860 'cv19_variant': variant 

861 }