Coverage for mddb_workflow/tools/generate_map.py: 65%

515 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-29 15:48 +0000

1import sys 

2import json 

3import urllib.request 

4import numpy as np 

5import re 

6 

7from mddb_workflow.utils.auxiliar import protein_residue_name_to_letter, NoReferableException 

8from mddb_workflow.utils.auxiliar import InputError, warn, load_json, save_json, request_pdb_data 

9from mddb_workflow.utils.constants import REFERENCE_SEQUENCE_FLAG, NO_REFERABLE_FLAG, NOT_FOUND_FLAG 

10from mddb_workflow.utils.file import File 

11from mddb_workflow.utils.type_hints import * 

12 

13import xmltodict 

14 

15from Bio import Align 

16from Bio.Blast import NCBIWWW 

17from Bio.Align import substitution_matrices, Alignment 

18 

19# Set generic sequences which should be similar to known antibodies 

20REFERENCE_ANTIBODY_SEQUENCES = { 

21 # Single domain antibody 

22 # This sequence has been designed with the aid of DeepSeek 

23 # CDR regionds have been replaced by X 

24 'QVQLVESGGGLVQPGGSLRLSCAASXXXXXXXWYRQAPGKEREFVAXXXXXXRFTISRDNAKNTVYLQMNSLKPEDTAVYYCXXXXXXXXXXWGQGTQVTVSS' 

25} 

26 

27 

28# Save current stderr to further restore it 

29stderr_backup = sys.stderr 

30# Suppress stderr 

31#sys.stderr = None 

32# Import the function 

33#from Bio.SubsMat import MatrixInfo 

34# Restore stderr 

35#sys.stderr = stderr_backup 

36 

37# AGUS: Necesitamos que la secuencia de aa alineada tenga gaps identificados con guiones y pairwaise2 (biopython < 1.80) no lo hace 

38# AGUS: Para actualizar a biopython >= 1.80 pairwaise2 ya no existe y no podemos obtener el mismo outoput que necesitábamos 

39# AGUS: Esta función parece resolver el problema: https://github.com/biopython/biopython/issues/4769 

40# Set the aligner  

41aligner = Align.PairwiseAligner(mode='local') 

42aligner.substitution_matrix = substitution_matrices.load("BLOSUM62") 

43aligner.open_gap_score = -10 

44aligner.extend_gap_score = -0.5 

45def add_leading_and_trailing_gaps(alignment: Alignment) -> Alignment: 

46 coords = alignment.coordinates 

47 

48 # We need to add two columns on each side of the coordinates matrix. 

49 # The first column will always point to the start of the sequences, 

50 # hence filled with zeros. 

51 new_coords = np.zeros((2, coords.shape[1] + 4), dtype=int) 

52 

53 # The last w column will always point to the end of sequences, 

54 # hence filled with the length of the respective sequences. 

55 target_len = len(alignment.sequences[0]) 

56 query_len = len(alignment.sequences[1]) 

57 last_col = np.array([target_len, query_len]) 

58 new_coords[:, -1] = last_col 

59 

60 # The middle columns will contain the original coordinates. 

61 new_coords[:, 2:-2] = coords 

62 

63 # The second and one before last columns will point to the end of sequence, 

64 # with less unaligned positions. 

65 new_coords[:, 1] = coords[:, 0] - coords[:, 0].min() 

66 new_coords[:, -2] = coords[:, -1] + (last_col - coords[:, -1]).min() 

67 return Alignment(sequences=alignment.sequences, coordinates=new_coords) 

68 

69# Map the structure aminoacids sequences against the standard reference sequences 

70# References are uniprot accession ids and they are optional 

71# For each reference, align the reference sequence with the topology sequence 

72# Chains which do not match any reference sequence will be blasted 

73# Note that an internet connection is required both to retireve the uniprot reference sequence and to do the blast 

74# NEVER FORGET: This system relies on the fact that topology chains are not repeated 

75def generate_protein_mapping ( 

76 structure : 'Structure', 

77 output_filepath : str, 

78 database_url : str, 

79 cache : 'Cache', 

80 register : dict, 

81 mercy : list[str] = [], 

82 input_protein_references : list | dict = [], 

83 pdb_ids : list[str] = [], 

84) -> dict: 

85 """Map the structure aminoacids sequences against the Uniprot reference sequences.""" 

86 # Set the output file 

87 protein_references_file = File(output_filepath) 

88 

89 # Remove previous warnings, if any 

90 register.remove_warnings(REFERENCE_SEQUENCE_FLAG) 

91 # Forced references must be list or dict 

92 # If it is none then we set it as an empty list 

93 if input_protein_references == None: 

94 input_protein_references = [] 

95 # If forced references is a list of dictionaries then it means the input is wrongly formatted 

96 # This may happen since the inputs file is in YAML format, and a simple hyphen makes the difference 

97 # We can fix it from here anyway 

98 if type(input_protein_references) == list and len(input_protein_references) > 0 and type(input_protein_references[0]) == dict: 

99 input_protein_references = { k: v for fr in input_protein_references for k, v in fr.items() } 

100 # Check if the forced references are strict (i.e. reference per chain, as a dictionary) or flexible (list of references) 

101 strict_references = type(input_protein_references) == dict 

102 # Check the "no referable" flag not to be passed when references are not strict 

103 if not strict_references and NO_REFERABLE_FLAG in input_protein_references: 

104 raise SystemExit('WRONG INPUT: The "no referable" flag cannot be passed in a list.' \ 

105 ' You must use a chain keys dictionary (e.g. {"A":"' + NO_REFERABLE_FLAG + '"})') 

106 # Store all the references which are got through this process 

107 # Note that not all references may be used at the end 

108 references = {} 

109 # Given a uniprot accession, get the reference object 

110 # Try first asking to the MDposit database in case the reference exists already 

111 # If not, retrieve UniProt data and build the reference object 

112 # Return also a boolean to set if the reference already existed (True) or not (False) 

113 def get_reference (uniprot_accession : str) -> tuple[dict, bool]: 

114 reference = references.get(uniprot_accession, None) 

115 if reference: 

116 return reference, True 

117 reference = get_mdposit_reference(uniprot_accession, database_url) 

118 if reference: 

119 return reference, True 

120 reference = get_uniprot_reference(uniprot_accession) 

121 return reference, False 

122 # Import local references, in case the references json file already exists 

123 imported_references = None 

124 if protein_references_file.exists: 

125 imported_references = import_references(protein_references_file) 

126 # Append the imported references to the overall references pool 

127 for k,v in imported_references.items(): 

128 references[k] = v 

129 # Get the structure chain sequences 

130 parsed_chains = get_parsed_chains(structure) 

131 # Find out which chains are protein 

132 protein_parsed_chains = [] 

133 for chain_data in parsed_chains: 

134 sequence = chain_data['sequence'] 

135 if next((letter for letter in sequence if letter != 'X'), None): 

136 chain_data['match'] = { 'ref': None, 'map': None, 'score': 0 } 

137 protein_parsed_chains.append(chain_data) 

138 # If there are no protein sequences then there is no need to map anything 

139 if len(protein_parsed_chains) == 0: 

140 print(' There are no protein sequences') 

141 return protein_parsed_chains 

142 # For each input forced reference, get the reference sequence 

143 reference_sequences = {} 

144 # Save already tried alignments to not repeat the alignment further 

145 tried_alignments = { chain_data['name']: [] for chain_data in protein_parsed_chains } 

146 # Set a function to try to match all protein sequences with the available reference sequences 

147 # In case of match, objects in the 'protein_parsed_chains' list are modified by adding the result 

148 # Finally, return True if all protein sequences were matched with the available reference sequences or False if not 

149 def match_sequences () -> bool: 

150 # Track each chain-reference alignment match and keep the score of successful alignments 

151 # Now for each structure sequence, align all reference sequences and keep the best alignment (if it meets the minimum) 

152 for chain_data in protein_parsed_chains: 

153 chain = chain_data['name'] 

154 chain_tried_alignments = tried_alignments[chain] 

155 # In case references are forced per chain check if there is a reference for this chain and match according to this 

156 if strict_references: 

157 # Get the forced specific chain for this sequence, if any 

158 forced_reference = input_protein_references.get(chain, None) 

159 if forced_reference: 

160 # If the chain has a specific forced reference then we must align it just once 

161 # Skip this process in further matches 

162 if chain_data['match']['ref']: 

163 continue 

164 # In case the forced reference is the "no referable" flag 

165 # Thus it has no reference sequence and we must not try to match it 

166 # Actually, any match would be accidental and not correct 

167 if forced_reference == NO_REFERABLE_FLAG: 

168 chain_data['match'] = { 'ref': NO_REFERABLE_FLAG } 

169 continue 

170 # In case the forced reference is the "not found" flag 

171 # This should not happend but we may be using references as forced references, so just in case 

172 if forced_reference == NOT_FOUND_FLAG: 

173 chain_data['match'] = { 'ref': NOT_FOUND_FLAG } 

174 continue 

175 # Get the forced reference sequence and align it to the chain sequence in order to build the map 

176 reference_sequence = reference_sequences[forced_reference] 

177 print(f' Aligning chain {chain} with {forced_reference} reference sequence') 

178 align_results = align(reference_sequence, chain_data['sequence']) 

179 # The align must match or we stop here and warn the user 

180 if not align_results: 

181 raise SystemExit(f'Forced reference {chain} -> {forced_reference} does not match in sequence') 

182 sequence_map, align_score = align_results 

183 reference = references[forced_reference] 

184 chain_data['match'] = { 'ref': reference, 'map': sequence_map, 'score': align_score } 

185 continue 

186 # If the chain has already a match and this match is among the forced references then stop here 

187 # Forced references have priority and this avoids having a match with a not forced reference further 

188 # Same behaviour if the match is with an unreferable sequence 

189 if chain_data['match']['ref'] and ( chain_data['match']['ref'] == NO_REFERABLE_FLAG 

190 or chain_data['match']['ref'] == NOT_FOUND_FLAG 

191 or chain_data['match']['ref']['uniprot'] in input_protein_references): 

192 continue 

193 # Iterate over the different available reference sequences 

194 for uniprot_id, reference_sequence in reference_sequences.items(): 

195 # If this alignment has been tried already then skip it 

196 if uniprot_id in chain_tried_alignments: 

197 continue 

198 # Align the structure sequence with the reference sequence 

199 # NEVER FORGET: This system relies on the fact that topology chains are not repeated 

200 print(f' Aligning chain {chain} with {uniprot_id} reference sequence') 

201 align_results = align(reference_sequence, chain_data['sequence']) 

202 tried_alignments[chain].append(uniprot_id) # Save the alignment try, no matter if it works or not 

203 if not align_results: 

204 continue 

205 # In case we have a valid alignment, check the alignment score is better than the current reference score (if any) 

206 sequence_map, align_score = align_results 

207 current_reference = chain_data['match'] 

208 if current_reference['score'] > align_score: 

209 continue 

210 # If the match is a 'no referable' exception then set a no referable flag 

211 if type(uniprot_id) == NoReferableException: 

212 chain_data['match'] = { 'ref': NO_REFERABLE_FLAG } 

213 continue 

214 # Proceed to set the corresponding reference toherwise 

215 reference = references[uniprot_id] 

216 # If the alignment is better then we impose the new reference 

217 chain_data['match'] = { 'ref': reference, 'map': sequence_map, 'score': align_score } 

218 # Sum up the current matching 

219 print(' Protein reference summary:') 

220 for chain_data in parsed_chains: 

221 name = chain_data['name'] 

222 match = chain_data.get('match', None) 

223 if not match: 

224 print(f' {name} -> Not protein') 

225 continue 

226 reference = chain_data['match'].get('ref', None) 

227 if not reference: 

228 print(f' {name} -> ¿?') 

229 continue 

230 if reference == NO_REFERABLE_FLAG: 

231 print(f' {name} -> No referable') 

232 continue 

233 if reference == NOT_FOUND_FLAG: 

234 print(f' {name} -> Not found') 

235 continue 

236 uniprot_id = reference['uniprot'] 

237 print(f' {name} -> {uniprot_id}') 

238 # Export already matched references 

239 export_references(protein_parsed_chains, protein_references_file) 

240 # Finally, return True if all protein sequences were matched with the available reference sequences or False if not 

241 allright = all([ chain_data['match']['ref'] for chain_data in protein_parsed_chains ]) 

242 # If we match all chains then make sure there is no forced reference missing which did not match 

243 # Otherwise stop here and force the user to remove these forced uniprot ids from the inputs file 

244 # WARNING: Although we could move on it is better to stop here and warn the user to prevent a future silent problem 

245 if allright and input_protein_references: 

246 # Get forced uniprot ids 

247 forced_uniprot_ids = set(list(input_protein_references.values()) if strict_references else input_protein_references) 

248 forced_uniprot_ids -= { NOT_FOUND_FLAG, NO_REFERABLE_FLAG } 

249 #forced_uniprot_ids.remove(NO_REFERABLE_FLAG) 

250 #forced_uniprot_ids.remove(NOT_FOUND_FLAG) 

251 # Get matched uniprot ids 

252 matched_references = [ chain_data['match']['ref'] for chain_data in protein_parsed_chains ] 

253 matched_uniprot_ids = set([ ref['uniprot'] for ref in matched_references if type(ref) == dict ]) 

254 # Check the difference 

255 unmatched_uniprot_ids = forced_uniprot_ids - matched_uniprot_ids 

256 if len(unmatched_uniprot_ids) > 0: 

257 log = ', '.join(unmatched_uniprot_ids) 

258 raise InputError(f'Some forced references were not matched with any protein sequence: {log}\n' 

259 ' Please remove them from the inputs file') 

260 return allright 

261 # --- End of match_sequences function -------------------------------------------------------------------------------- 

262 # First use the forced references for the matching 

263 if input_protein_references: 

264 forced_uniprot_ids = list(input_protein_references.values()) if strict_references else input_protein_references 

265 for uniprot_id in forced_uniprot_ids: 

266 # If instead of a uniprot id there is a 'no referable' flag then we skip this process 

267 if uniprot_id == NO_REFERABLE_FLAG: 

268 continue 

269 # If instead of a uniprot id there is a 'not found' flag then we skip this process 

270 # This should not happend but we may be using references as forced references, so just in case 

271 if uniprot_id == NOT_FOUND_FLAG: 

272 continue 

273 # If reference is already in the list (i.e. it has been imported) then skip this process 

274 reference = references.get(uniprot_id, None) 

275 if reference: 

276 reference_sequences[uniprot_id] = reference['sequence'] 

277 continue 

278 # Find the reference data for the given uniprot id 

279 reference, already_loaded = get_reference(uniprot_id) 

280 reference_sequences[uniprot_id] = reference['sequence'] 

281 # Save the current whole reference object for later 

282 references[reference['uniprot']] = reference 

283 # Now that we have all forced references data perform the matching 

284 # If we have every protein chain matched with a reference then we stop here 

285 print(' Using only forced references from the inputs file') 

286 if match_sequences(): 

287 return protein_parsed_chains 

288 # Now add the imported references to reference sequences. Thus now they will be 'matchable' 

289 # Thus now they will be 'matchable', so try to match sequences again in case any of the imported references has not been tried 

290 # It was not done before since we want forced references to have priority 

291 if imported_references: 

292 need_rematch = False 

293 for uniprot_id, reference in imported_references.items(): 

294 # If the imported reference has been aligned already (i.e. it was a forced reference) 

295 if uniprot_id in reference_sequences: 

296 continue 

297 # Otherwise, include it 

298 need_rematch = True 

299 reference_sequences[uniprot_id] = reference['sequence'] 

300 # If there was at least one imported reference missing then rerun the matching 

301 if need_rematch: 

302 print(' Using also references imported from references.json') 

303 if match_sequences(): 

304 return protein_parsed_chains 

305 # If there are still any chain which is not matched with a reference then we need more references 

306 # To get them, retrieve all uniprot ids associated to the pdb ids, if any 

307 if pdb_ids and len(pdb_ids) > 0: 

308 for pdb_id in pdb_ids: 

309 # Ask PDB 

310 uniprot_ids = pdb_to_uniprot(pdb_id) 

311 for uniprot_id in uniprot_ids: 

312 # If this is not an actual UniProt, but a no referable exception, then handle it 

313 # We must find the matching sequence and set the corresponding chain as no referable 

314 if type(uniprot_id) == NoReferableException: 

315 reference_sequences[uniprot_id] = uniprot_id.sequence 

316 continue 

317 # Build a new reference from the resulting uniprot 

318 reference, already_loaded = get_reference(uniprot_id) 

319 if reference == None: 

320 continue 

321 reference_sequences[reference['uniprot']] = reference['sequence'] 

322 # Save the current whole reference object for later 

323 references[reference['uniprot']] = reference 

324 # If we have every protein chain matched with a reference already then we stop here 

325 print(' Using also references related to PDB ids from the inputs file') 

326 if match_sequences(): 

327 return protein_parsed_chains 

328 # If there are still any chain which is not matched with a reference then we need more references 

329 # To get them, we run a blast with each orphan chain sequence 

330 for chain_data in protein_parsed_chains: 

331 # Skip already references chains 

332 if chain_data['match']['ref']: 

333 continue 

334 # Run the blast 

335 sequence = chain_data['sequence'] 

336 uniprot_id = blast(sequence, cache) 

337 if not uniprot_id: 

338 chain_data['match'] = { 'ref': NOT_FOUND_FLAG } 

339 continue 

340 # Build a new reference from the resulting uniprot 

341 reference, already_loaded = get_reference(uniprot_id) 

342 reference_sequences[reference['uniprot']] = reference['sequence'] 

343 # Save the current whole reference object for later 

344 references[reference['uniprot']] = reference 

345 # If we have every protein chain matched with a reference already then we stop here 

346 print(' Using also references from blast') 

347 if match_sequences(): 

348 return protein_parsed_chains 

349 # At this point we should have macthed all sequences 

350 # If not, kill the process unless mercy was given 

351 must_be_killed = REFERENCE_SEQUENCE_FLAG not in mercy 

352 if must_be_killed: 

353 raise SystemExit('BLAST failed to find a matching reference sequence for at least one protein sequence') 

354 warn('BLAST failed to find a matching reference sequence for at least one protein sequence') 

355 register.add_warning(REFERENCE_SEQUENCE_FLAG, 'There is at least one protein region which is not mapped to any reference sequence') 

356 return protein_parsed_chains 

357 

358# Export reference objects data to a json file 

359# This file is used by the loader to load new references to the database 

360# Note that all references are saved to this json file, even those which are already in the database 

361# It is the loader who is the responsible to check which references must be loaded and which ones are loaded already 

362# Note that mapping data (i.e. which residue belongs to each reference) is not saved 

363def export_references (mapping_data : list, protein_references_file : 'File'): 

364 final_references = [] 

365 final_uniprots = [] 

366 for data in mapping_data: 

367 match = data['match'] 

368 ref = match['ref'] 

369 if not ref or ref == NO_REFERABLE_FLAG or ref == NOT_FOUND_FLAG: 

370 continue 

371 uniprot = ref['uniprot'] 

372 if uniprot in final_uniprots: 

373 continue 

374 final_references.append(ref) 

375 final_uniprots.append(uniprot) 

376 # If there are no references to exported then do not egenrate the json file 

377 if len(final_references) == 0: 

378 return 

379 # Write references to a json file 

380 save_json(final_references, protein_references_file.path, indent = 4) 

381 

382# Import reference json file so we do not have to rerun this process 

383def import_references (protein_references_file : 'File') -> list: 

384 print(' Importing references from ' + protein_references_file.path) 

385 # Read the file 

386 file_content = load_json(protein_references_file.path) 

387 # Format data as the process expects to find it 

388 references = {} 

389 for reference in file_content: 

390 uniprot = reference['uniprot'] 

391 references[uniprot] = reference 

392 return references 

393 

394# Get each chain name and aminoacids sequence in a topology 

395# Output format example: [ { 'sequence': 'VNLTT', 'indices': [1, 2, 3, 4, 5] }, ... ] 

396def get_parsed_chains (structure : 'Structure') -> list: 

397 parsed_chains = [] 

398 chains = structure.chains 

399 for chain in chains: 

400 name = chain.name 

401 sequence = '' 

402 residue_indices = [] 

403 for residue in chain.residues: 

404 letter = protein_residue_name_to_letter(residue.name) 

405 sequence += letter 

406 residue_indices.append(residue.index) 

407 sequence_object = { 'name': name, 'sequence': sequence, 'residue_indices': residue_indices } 

408 parsed_chains.append(sequence_object) 

409 return parsed_chains 

410 

411# Align two aminoacid sequences 

412# Return a list with the reference residue indexes (values) 

413# which match each new sequence residues indexes (indexes) 

414# Return also the score of the alignment 

415# Return None when there is not valid alignment at all 

416# Set verbose = True to see a visual summary of the sequence alignments in the logs 

417def align (ref_sequence : str, new_sequence : str, verbose : bool = False) -> Optional[ tuple[list, float] ]: 

418 

419 #print('- REFERENCE\n' + ref_sequence + '\n- NEW\n' + new_sequence) 

420 

421 # If the new sequence is all 'X' stop here, since this would make the alignment infinite 

422 # Then an array filled with None is returned 

423 if all([ letter == 'X' for letter in new_sequence ]): 

424 return None 

425 

426 # Return the new sequence as best aligned as possible with the reference sequence 

427 #alignments = pairwise2.align.localds(ref_sequence, new_sequence, MatrixInfo.blosum62, -10, -0.5) 

428 # DANI: Habría que hacerlo de esta otra forma según el deprecation warning (arriba hay más código) 

429 # DANI: El problema es que el output lo tiene todo menos la sequencia en formato alienada 

430 # DANI: i.e. formato '----VNLTT' (con los gaps identificados con guiones), que es justo el que necesito 

431 # Actualized code for biopython >= 1.80 

432 alignments = aligner.align(ref_sequence, new_sequence) 

433 

434 # In case there are no alignments it means the current chain has nothing to do with this reference 

435 # Then an array filled with None is returned 

436 if len(alignments) == 0: 

437 return None 

438 

439 # Several alignments may be returned, specially when it is a difficult or impossible alignment 

440 # Output format example: '----VNLTT' 

441 best_alignment = alignments[0] 

442 best_alignment_with_gaps = add_leading_and_trailing_gaps(best_alignment) 

443 # Extract the allignment sequences 

444 #aligned_ref, aligned_sequence = best_alignment_with_gaps.sequences 

445 aligned_ref = best_alignment_with_gaps[0] # AGUS: Puede ser útil en el futuro 

446 aligned_sequence = best_alignment_with_gaps[1] 

447 # Obtain the score of the alignment 

448 score = best_alignment.score 

449 # Calculate the normalized score 

450 normalized_score = score / len(new_sequence) 

451 

452 # If the normalized score does not reaches the minimum we consider the alignment is not valid 

453 # It may happen when the reference goes for a specific chain but we must map all chains 

454 # This 1 has been found experimentally 

455 # Non maching sequence may return a 0.1-0.3 normalized score 

456 # Matching sequence may return >4 normalized score 

457 if normalized_score < 2: 

458 print(' Not valid alignment') 

459 return None 

460 

461 # Tell the user about the success 

462 beautiful_normalized_score = round(normalized_score * 100) / 100 

463 print(f' Valid alignment -> Normalized score = {beautiful_normalized_score}') 

464 

465 # Match each residue 

466 aligned_mapping = [] 

467 aligned_index = 0 

468 for l, letter in enumerate(aligned_sequence): 

469 # Guions are skipped 

470 if letter == '-': 

471 continue 

472 # Get the current residue of the new sequence 

473 equivalent_letter = new_sequence[aligned_index] 

474 if not letter == equivalent_letter: 

475 raise SystemExit(f'Something was wrong at position {l} :S') 

476 # 'X' residues cannot be mapped since reference sequences should never have any 'X' 

477 if letter == 'X': 

478 aligned_mapping.append(None) 

479 aligned_index += 1 

480 continue 

481 # Otherwise add the equivalent aligned index to the mapping 

482 # WARNING: Add +1 since uniprot residue counts start at 1, not 0 

483 aligned_mapping.append(l + 1) 

484 aligned_index += 1 

485 

486 return aligned_mapping, normalized_score 

487 

488# Given an aminoacids sequence, return a list of uniprot ids 

489# Note that we are blasting against UniProtKB / Swiss-Prot so results will always be valid UniProt accessions 

490# WARNING: This always means results will correspond to curated entries only 

491# If your sequence is from an exotic organism the result may be not from it but from other more studied organism 

492# Since this function may take some time we always cache the result 

493def blast (sequence : str, cache : Optional['Cache'] = None) -> Optional[str]: 

494 # Check if we have the resulted saved in cache already 

495 if cache != None: 

496 cached_blasts = cache.retrieve('blasts', {}) 

497 if sequence in cached_blasts: 

498 accession = cached_blasts[sequence] 

499 print(f'Using previous blast result from cache: {accession}') 

500 return accession 

501 print('Throwing blast...') 

502 result = NCBIWWW.qblast( 

503 program = "blastp", 

504 database = "swissprot", # UniProtKB / Swiss-Prot 

505 sequence = sequence, 

506 ) 

507 parsed_result = xmltodict.parse(result.read()) 

508 hits = parsed_result['BlastOutput']['BlastOutput_iterations']['Iteration']['Iteration_hits'] 

509 # When there is no result return None 

510 # Note that this is possible although hardly unprobable 

511 if not hits: 

512 return None 

513 # Get the first result only 

514 result = hits['Hit'][0] 

515 # Return the accession 

516 # DANI: Si algun día tienes problemas porque te falta el '.1' al final del accession puedes sacarlo de Hit_id 

517 accession = result['Hit_accession'] 

518 print('Result: ' + accession) 

519 # Save the result in the cache 

520 if cache != None: 

521 cached_blasts[sequence] = accession 

522 cached_blasts = cache.update('blasts', cached_blasts) 

523 return accession 

524 

525# Given a uniprot accession, use the MDposit API to request its data in case it is already in the database 

526# If the reference is not found or there is any error then return None and keep going, we do not really need this 

527def get_mdposit_reference (uniprot_accession : str, database_url : str) -> Optional[dict]: 

528 # Request MDposit 

529 request_url = f'{database_url}rest/v1/references/{uniprot_accession}' 

530 try: 

531 with urllib.request.urlopen(request_url) as response: 

532 parsed_response = json.loads(response.read().decode("utf-8")) 

533 # If the accession is not found in the database then we stop here 

534 except urllib.error.HTTPError as error: 

535 # If the uniprot accession is not yet in the MDposit references then return None 

536 if error.code == 404: 

537 return None 

538 else: 

539 warn(f'Something went wrong with the MDposit request {request_url} (error {error.code})') 

540 warn(f'Protein reference {uniprot_accession} will be made again even if it exists') 

541 return None 

542 # This error may occur if there is no internet connection 

543 except urllib.error.URLError as error: 

544 # The error variable as is do not work properly 

545 # Its 'errno' value is None (at least for tiemout errors) 

546 actual_error = error.args[0] 

547 if actual_error.errno == 110: 

548 print('Timeout error when requesting MDposit. Is the node fallen?') 

549 warn('Something went wrong with the MDposit request ' + request_url) 

550 warn(f'Protein reference {uniprot_accession} will be made again even if it exists') 

551 return None 

552 return parsed_response 

553 

554# Given a uniprot accession, use the uniprot API to request its data and then mine what is needed for the database 

555def get_uniprot_reference (uniprot_accession : str) -> Optional[dict]: 

556 # Request Uniprot 

557 request_url = 'https://www.ebi.ac.uk/proteins/api/proteins/' + uniprot_accession 

558 parsed_response = None 

559 request = urllib.request.Request(request_url) 

560 # One day the API was returning a 'Secure Connection Failed' error and this header fixed the problem 

561 request.add_header('Referer', 'https://www.uniprot.org/') 

562 try: 

563 with urllib.request.urlopen(request) as response: 

564 parsed_response = json.loads(response.read().decode("utf-8")) 

565 # If the accession is not found in UniProt then the id is not valid 

566 except urllib.error.HTTPError as error: 

567 if error.code == 404: 

568 warn('Cannot find UniProt entry for accession ' + uniprot_accession) 

569 return None 

570 print('Error when requesting ' + request_url) 

571 raise RuntimeError(f'Something went wrong with the Uniprot request (error {error.code})') 

572 except: 

573 print('Error when requesting ' + request_url) 

574 raise RuntimeError(f'Something went very wrong with the Uniprot request') 

575 # If we have not a response at this point then it may mean we are trying to access an obsolete entry (e.g. P01607) 

576 if parsed_response == None: 

577 warn('Cannot find UniProt entry for accession ' + uniprot_accession) 

578 return None 

579 # Get the full protein name 

580 protein_data = parsed_response['protein'] 

581 protein_name_data = protein_data.get('recommendedName', None) 

582 # DANI: It is possible that the 'recommendedName' is missing if it is not a reviewed UniProt entry 

583 if not protein_name_data: 

584 warn(f'The UniProt accession {uniprot_accession} is missing the recommended name. You should consider changing the reference.') 

585 protein_name_data = protein_data.get('submittedName', None)[0] 

586 if not protein_name_data: 

587 raise ValueError('Unexpected structure in UniProt response for accession ' + uniprot_accession) 

588 protein_name = protein_name_data['fullName']['value'] 

589 # Get the gene names as a single string 

590 gene_names = [] 

591 # WARNING: Some uniprot entries are missing gene names (e.g. P00718) 

592 # WARNING: Some uniprot entries have names in non-canonical keys (e.g. orfNames, olnNames) 

593 # These names are not shown event in the uniprot web page so we also skip them 

594 genes = parsed_response.get('gene', []) 

595 for gene in genes: 

596 gene_name = gene.get('name', None) 

597 if not gene_name: 

598 continue 

599 gene_names.append(gene_name['value']) 

600 gene_names = ', '.join(gene_names) if len(gene_names) > 0 else None 

601 # Get the organism name 

602 organism = parsed_response['organism']['names'][0]['value'] 

603 # Get the aminoacids sequence 

604 sequence = parsed_response['sequence']['sequence'] 

605 # Get interesting regions to be highlighted in the client 

606 domains = [] 

607 # WARNING: Some uniprot entries are missing features (e.g. O27908) 

608 features = parsed_response.get('features', []) 

609 # Get comments data which may be useful to further build domain descriptions 

610 comments_data = parsed_response.get('comments', None) 

611 # There are many types of features but in this case we will focus on domanins and similar 

612 target_types = ['CHAIN', 'REGION', 'DOMAIN', 'MOTIF', 'SITE'] 

613 for feature in features: 

614 # Skip features of other types 

615 if feature['type'] not in target_types: 

616 continue 

617 # Get the domain name 

618 name = feature['description'] 

619 # Build the domain description from the coments data 

620 description = None 

621 if comments_data: 

622 comments = [ comment for comment in parsed_response['comments'] if name == comment.get('molecule', None) ] 

623 comment_text = [] 

624 for comment in comments: 

625 if not comment.get('text', False): continue 

626 text = comment.get('text', None) 

627 if text == None: raise ValueError('Unexpected UniProt response format: no text in comment') 

628 # DANI: el comment 'text' casi siempre es una lista 

629 # DANI: solo tengo constancia de una vez en que era un string directamente 

630 # DANI: en uno de los comentarios de https://www.ebi.ac.uk/proteins/api/proteins/Q15465 

631 if type(text) == str: comment_text.append(text) 

632 elif type(text) == list: comment_text.append(text[0]['value']) 

633 else: raise ValueError('Unexpected UniProt response format: text in comment is neither str or list') 

634 description = '\n\n'.join(comment_text) 

635 # Set the domain selection 

636 # The domain 'begin' and 'end' values may include non-numeric symbols such as '~', '>' or '<' 

637 # These values are usually ignored or replaced by '?' in the UniProt web client 

638 # There may be not numeric value at all (e.g. Q99497) 

639 # In these cases uniprot shows the domain but it does not allow to use its functionallities  

640 # e.g. you can not blast using the domain sequence 

641 # It makes not sense having a domain with unkown range to me so we skip these domains 

642 begin = feature['begin'].replace('~', '').replace('>', '').replace('<', '') 

643 end = feature['end'].replace('~', '').replace('>', '').replace('<', '') 

644 if begin == '' or end == '': 

645 continue 

646 selection = begin + '-' + end 

647 # If we already have a domain with the same name then join both domains 

648 # For instance, you may have several repetitions of the 'Disordered' region 

649 already_existing_domain = next((domain for domain in domains if domain['name'] == name), None) 

650 if already_existing_domain: 

651 already_existing_domain['selection'] += ', ' + selection 

652 continue 

653 # Otherwise, create a new domain 

654 domain = { 

655 'name': name, 

656 'selection': selection 

657 } 

658 # Add a description only if we succesfully mined it 

659 # Note that only features tagged as CHAIN have comments (not sure about this) 

660 if description: 

661 domain['description'] = description 

662 # Add the new domain to the list 

663 domains.append(domain) 

664 # Mine protein functions from Gene Ontology references 

665 # Get database references 

666 db_references = parsed_response['dbReferences'] 

667 # Get references from Gene Ontology (GO) only 

668 go_references = [ ref for ref in db_references if ref['type'] == 'GO' ] 

669 # A Gene Ontology entry may be one of three types: 

670 # Cellular Component (C), Molecular Function (F) and Biological Process (P) 

671 # In this case we are interested in protein function only so we will keep those with the 'F' header 

672 functions = [ ref['properties']['term'][2:] for ref in go_references if ref['properties']['term'][0:2] == 'F:' ] 

673 # Set the final reference to be uploaded to the database 

674 return { 

675 'name': protein_name, 

676 'gene': gene_names, 

677 'organism': organism, 

678 'uniprot': uniprot_accession, 

679 'sequence': sequence, 

680 'domains': domains, 

681 'functions': functions 

682 } 

683 

684# Given a pdb Id, get its uniprot id 

685# e.g. 6VW1 -> Q9BYF1, P0DTC2, P59594 

686def pdb_to_uniprot (pdb_id : str) -> list[ str | NoReferableException ]: 

687 # Set the request query 

688 query = '''query ($id: String!) { 

689 entry(entry_id: $id) { 

690 polymer_entities { 

691 rcsb_polymer_entity_container_identifiers { uniprot_ids } 

692 rcsb_entity_source_organism { scientific_name } 

693 entity_poly { pdbx_seq_one_letter_code } 

694 } 

695 } 

696 }''' 

697 # Request PDB data 

698 parsed_response = request_pdb_data(pdb_id, query) 

699 # Mine data 

700 uniprot_ids = [] 

701 for polymer in parsed_response['polymer_entities']: 

702 identifier = polymer['rcsb_polymer_entity_container_identifiers'] 

703 # Get the uniprot 

704 uniprots = identifier.get('uniprot_ids', None) 

705 # If there are not UniProts at all in this entity skip to the next 

706 if not uniprots: 

707 # However, it may happen the the polymer organism is set as synthetic construct 

708 # In this case we keep the sequence and return a no referable exception 

709 organisms = polymer.get('rcsb_entity_source_organism', None) 

710 if not organisms: raise ValueError('Missing organism') 

711 # Get scientific names in lower caps since sometimes they are all upper caps 

712 scientific_names = set( 

713 [ (organism.get('scientific_name') or '').lower() for organism in organisms ]) 

714 if 'synthetic construct' in scientific_names: 

715 entity = polymer.get('entity_poly', None) 

716 if not entity: raise ValueError(f'Missing entity in {pdb_id}') 

717 sequence = entity.get('pdbx_seq_one_letter_code', None) 

718 if not sequence: raise ValueError(f'Missing sequence in {pdb_id}') 

719 # WARNING: A syntetic construct may have a "special" sequence 

720 # It may combine one-letter code with 3-letter code in parenthesis for special aminoacids 

721 # e.g. 5JMO, entity 3 -> (DKA)RVK(AR7)(0QE) 

722 # We simply replace these special aminoacids by X 

723 sequence = re.sub(r'\([0-9A-Z]{3}\)', 'X', sequence) 

724 uniprot_ids.append( NoReferableException(sequence) ) 

725 continue 

726 # Check if the sequence of this chain may belong to an antibody 

727 entity = polymer.get('entity_poly', None) 

728 if not entity: continue 

729 sequence = entity.get('pdbx_seq_one_letter_code', None) 

730 if not sequence: continue 

731 print(' Could this be an antibody?') 

732 is_antibody = False 

733 for reference_sequence in REFERENCE_ANTIBODY_SEQUENCES: 

734 if align(reference_sequence, sequence): 

735 is_antibody = True 

736 break 

737 conclusion = f' I would guess it {"is" if is_antibody else "is not"} an antibody' 

738 print(conclusion) 

739 # If so, the also set this chain as no referable since antibodies have no UniProt id 

740 if is_antibody: 

741 uniprot_ids.append( NoReferableException(sequence) ) 

742 continue 

743 # If we did not fall in any of the previous sections then continue 

744 continue 

745 # If we have multiple uniprots in a single entity then we skip them 

746 # Note tha they belong to an entity which is no referable (e.g. a chimeric entity) 

747 # See 5GY2 and 7E2Z labeled as chimeric entities and 6e67, which is not labeled likewise 

748 if len(uniprots) > 1: 

749 warn(f'Multiple UniProt ids in the same entity in {pdb_id} -> Is this a chimeric entity?') 

750 entity = polymer.get('entity_poly', None) 

751 if not entity: raise ValueError(f'Missing entity in {pdb_id}') 

752 sequence = entity.get('pdbx_seq_one_letter_code', None) 

753 if not sequence: raise ValueError(f'Missing sequence in {pdb_id}') 

754 uniprot_ids.append( NoReferableException(sequence) ) 

755 continue 

756 uniprot_id = uniprots[0] 

757 uniprot_ids.append(uniprot_id) 

758 actual_uniprot_ids = [ uniprot_id for uniprot_id in uniprot_ids if type(uniprot_id) == str ] 

759 if len(actual_uniprot_ids) > 0: 

760 print(f' UniProt ids for PDB id {pdb_id}: ' + ', '.join(actual_uniprot_ids)) 

761 else: print(f' No UniProt ids were found for PDB id {pdb_id}') 

762 no_refs = [ uniprot_id for uniprot_id in uniprot_ids if type(uniprot_id) == NoReferableException ] 

763 no_refs_count = len(no_refs) 

764 if no_refs_count > 0: 

765 print(f' Also encountered {no_refs_count} no refereable sequences for PDB id {pdb_id}') 

766 return uniprot_ids 

767 

768# This function is used by the generate_metadata script 

769# 1. Get structure sequences 

770# 2. Calculate which reference domains are covered by the previous sequence 

771# 3. In case it is a covid spike, align the previous sequence against all saved variants (they are in 'utils') 

772from mddb_workflow.resources.covid_variants import covid_spike_variants 

773def get_sequence_metadata (structure : 'Structure', protein_references_file : 'File', residue_map : dict) -> dict: 

774 # Mine sequences from the structure 

775 sequences = [] 

776 # Classify sequences according to if they belong to protein or nucleic sequences 

777 # WARNING: We are interested in unique sequence BUT we also want to keep the order 

778 # WARNING: Do NOT use sets here to the order of appearance in the structure is respected 

779 protein_sequences = [] 

780 nucleic_sequences = [] 

781 # Iterate structure chains 

782 for chain in structure.chains: 

783 # Get the current chain sequence and add it to the list 

784 sequence = chain.get_sequence() 

785 sequences.append(sequence) 

786 # Get the chain classification 

787 classification = chain.get_classification() 

788 # Depending on what it is, add the sequence also in the corresponding list 

789 if classification == 'protein' and sequence not in protein_sequences: 

790 protein_sequences.append(sequence) 

791 elif (classification == 'dna' or classification == 'rna') and sequence not in nucleic_sequences: 

792 nucleic_sequences.append(sequence) 

793 # Get values from the residue map 

794 # Get protein references from the residues map 

795 reference_ids = [] 

796 references = residue_map['references'] 

797 if references and len(references) > 0: 

798 for ref, ref_type in zip(references, residue_map['reference_types']): 

799 if ref_type == 'protein': 

800 reference_ids.append(ref) 

801 residue_reference_numbers = residue_map['residue_reference_numbers'] 

802 residue_reference_indices = residue_map['residue_reference_indices'] 

803 # Load references data, which should already be save to the references data file 

804 references_data = import_references(protein_references_file) if protein_references_file.exists else {} 

805 # In case we have the SARS-CoV-2 spike among the references, check also which is the variant it belongs to 

806 # DANI: Esto es un parche. En un futuro buscaremos una manera mejor de comprovar variantes en cualquier contexto 

807 variant = None 

808 covid_spike_reference_id = 'P0DTC2' 

809 if covid_spike_reference_id in reference_ids: 

810 # Load the reference data and find a chain which belongs to the spike 

811 # We consider having only one variant, so all chains should return the same result 

812 reference_data = references_data[covid_spike_reference_id] 

813 covid_spike_reference_index = reference_ids.index(covid_spike_reference_id) 

814 sample_residue_index = next((residue_index for residue_index, reference_index in enumerate(residue_reference_indices) if reference_index == covid_spike_reference_index), None) 

815 if sample_residue_index == None: 

816 raise SystemExit('Failed to find residue belonging to SARS-CoV-2 variant') 

817 sample_chain_sequence = structure.residues[sample_residue_index].chain.get_sequence() 

818 # Align the sample chain sequence against all variants to find the best match 

819 highest_score = 0 

820 print('Finding SARS-CoV-2 variant') 

821 for variant_name, variant_sequence in covid_spike_variants.items(): 

822 print(f' Trying with {variant_name}') 

823 align_results = align(variant_sequence, sample_chain_sequence) 

824 if not align_results: 

825 continue 

826 mapping, score = align_results 

827 if score > highest_score: 

828 highest_score = score 

829 variant = variant_name 

830 # At this point there should be a result 

831 if not variant: 

832 raise SystemExit('Something went wrong trying to find the SARS-CoV-2 variant') 

833 print(f'It is {variant}') 

834 # Set which reference domains are present in the structure 

835 domains = [] 

836 for reference_index, reference_id in enumerate(reference_ids): 

837 # If this is not referable then there are no domains to mine 

838 if reference_id == NO_REFERABLE_FLAG: 

839 continue 

840 # If the reference was not found then there are no domains to mine 

841 if reference_id == NOT_FOUND_FLAG: 

842 continue 

843 # Get residue numbers of residues which belong to the current reference in th residue map 

844 residue_numbers = [] 

845 for residue_index, residue_reference_index in enumerate(residue_reference_indices): 

846 if residue_reference_index == reference_index: 

847 residue_numbers.append(residue_reference_numbers[residue_index]) 

848 # Set a function to find if any of the residue numbers is within a given range 

849 def in_range (start : int, end : int) -> bool: 

850 for residue_number in residue_numbers: 

851 if residue_number >= start and residue_number <= end: 

852 return True 

853 return False 

854 # Get the reference data for the current reference uniprot id 

855 reference_data = references_data.get(reference_id, None) 

856 if not reference_data: 

857 raise SystemExit(reference_id + ' is not in the references data file') 

858 # Iterate over data domains 

859 for domain in reference_data['domains']: 

860 selection = domain['selection'] 

861 # Iterate over the residue ranges in the selection field 

862 residue_ranges = selection.split(', ') 

863 for residue_range in residue_ranges: 

864 start, end = [ int(num) for num in residue_range.split('-') ] 

865 # If this range includes any resiudes in the residue map then the domain is included in the list 

866 if in_range(start, end): 

867 domains.append(domain['name']) 

868 break 

869 # Return the sequence matadata 

870 return { 

871 'sequences': sequences, 

872 'protein_sequences': protein_sequences, 

873 'nucleic_sequences': nucleic_sequences, 

874 'domains': domains, 

875 'cv19_variant': variant 

876 }