Coverage for model_workflow/tools/generate_map.py: 69%

482 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 10:54 +0000

1import sys 

2import json 

3import urllib.request 

4import numpy as np 

5from model_workflow.utils.auxiliar import protein_residue_name_to_letter, NoReferableException 

6from model_workflow.utils.auxiliar import InputError, warn, load_json, save_json, request_pdb_data 

7from model_workflow.utils.constants import REFERENCE_SEQUENCE_FLAG, NO_REFERABLE_FLAG, NOT_FOUND_FLAG 

8from model_workflow.utils.file import File 

9from model_workflow.utils.type_hints import * 

10 

11import xmltodict 

12 

13from Bio import Align 

14from Bio.Blast import NCBIWWW 

15from Bio.Align import substitution_matrices, Alignment 

16 

17# Save current stderr to further restore it 

18stderr_backup = sys.stderr 

19# Suppress stderr 

20#sys.stderr = None 

21# Import the function 

22#from Bio.SubsMat import MatrixInfo 

23# Restore stderr 

24#sys.stderr = stderr_backup 

25 

26# AGUS: Necesitamos que la secuencia de aa alineada tenga gaps identificados con guiones y pairwaise2 (biopython < 1.80) no lo hace 

27# AGUS: Para actualizar a biopython >= 1.80 pairwaise2 ya no existe y no podemos obtener el mismo outoput que necesitábamos 

28# AGUS: Esta función parece resolver el problema: https://github.com/biopython/biopython/issues/4769 

29# Set the aligner  

30aligner = Align.PairwiseAligner(mode='local') 

31aligner.substitution_matrix = substitution_matrices.load("BLOSUM62") 

32aligner.open_gap_score = -10 

33aligner.extend_gap_score = -0.5 

34def add_leading_and_trailing_gaps(alignment: Alignment) -> Alignment: 

35 coords = alignment.coordinates 

36 

37 # We need to add two columns on each side of the coordinates matrix. 

38 # The first column will always point to the start of the sequences, 

39 # hence filled with zeros. 

40 new_coords = np.zeros((2, coords.shape[1] + 4), dtype=int) 

41 

42 # The last w column will always point to the end of sequences, 

43 # hence filled with the length of the respective sequences. 

44 target_len = len(alignment.sequences[0]) 

45 query_len = len(alignment.sequences[1]) 

46 last_col = np.array([target_len, query_len]) 

47 new_coords[:, -1] = last_col 

48 

49 # The middle columns will contain the original coordinates. 

50 new_coords[:, 2:-2] = coords 

51 

52 # The second and one before last columns will point to the end of sequence, 

53 # with less unaligned positions. 

54 new_coords[:, 1] = coords[:, 0] - coords[:, 0].min() 

55 new_coords[:, -2] = coords[:, -1] + (last_col - coords[:, -1]).min() 

56 return Alignment(sequences=alignment.sequences, coordinates=new_coords) 

57 

58# Map the structure aminoacids sequences against the standard reference sequences 

59# References are uniprot accession ids and they are optional 

60# For each reference, align the reference sequence with the topology sequence 

61# Chains which do not match any reference sequence will be blasted 

62# Note that an internet connection is required both to retireve the uniprot reference sequence and to do the blast 

63# NEVER FORGET: This system relies on the fact that topology chains are not repeated 

64def generate_protein_mapping ( 

65 structure : 'Structure', 

66 output_filepath : str, 

67 database_url : str, 

68 register : dict, 

69 mercy : List[str] = [], 

70 input_protein_references : Union[list,dict] = [], 

71 pdb_ids : List[str] = [], 

72) -> dict: 

73 """Map the structure aminoacids sequences against the Uniprot reference sequences.""" 

74 # Set the output file 

75 protein_references_file = File(output_filepath) 

76 

77 # Remove previous warnings, if any 

78 register.remove_warnings(REFERENCE_SEQUENCE_FLAG) 

79 # Forced references must be list or dict 

80 # If it is none then we set it as an empty list 

81 if input_protein_references == None: 

82 input_protein_references = [] 

83 # If forced references is a list of dictionaries then it means the input is wrongly formatted 

84 # This may happen since the inputs file is in YAML format, and a simple hyphen makes the difference 

85 # We can fix it from here anyway 

86 if type(input_protein_references) == list and len(input_protein_references) > 0 and type(input_protein_references[0]) == dict: 

87 input_protein_references = { k: v for fr in input_protein_references for k, v in fr.items() } 

88 # Check if the forced references are strict (i.e. reference per chain, as a dictionary) or flexible (list of references) 

89 strict_references = type(input_protein_references) == dict 

90 # Check the "no referable" flag not to be passed when references are not strict 

91 if not strict_references and NO_REFERABLE_FLAG in input_protein_references: 

92 raise SystemExit('WRONG INPUT: The "no referable" flag cannot be passed in a list.' \ 

93 ' You must use a chain keys dictionary (e.g. {"A":"' + NO_REFERABLE_FLAG + '"})') 

94 # Store all the references which are got through this process 

95 # Note that not all references may be used at the end 

96 references = {} 

97 # Given a uniprot accession, get the reference object 

98 # Try first asking to the MDposit database in case the reference exists already 

99 # If not, retrieve UniProt data and build the reference object 

100 # Return also a boolean to set if the reference already existed (True) or not (False) 

101 def get_reference (uniprot_accession : str) -> Tuple[dict, bool]: 

102 reference = references.get(uniprot_accession, None) 

103 if reference: 

104 return reference, True 

105 reference = get_mdposit_reference(uniprot_accession, database_url) 

106 if reference: 

107 return reference, True 

108 reference = get_uniprot_reference(uniprot_accession) 

109 return reference, False 

110 # Import local references, in case the references json file already exists 

111 imported_references = None 

112 if protein_references_file.exists: 

113 imported_references = import_references(protein_references_file) 

114 # Append the imported references to the overall references pool 

115 for k,v in imported_references.items(): 

116 references[k] = v 

117 # Get the structure chain sequences 

118 parsed_chains = get_parsed_chains(structure) 

119 # Find out which chains are protein 

120 protein_parsed_chains = [] 

121 for chain_data in parsed_chains: 

122 sequence = chain_data['sequence'] 

123 if next((letter for letter in sequence if letter != 'X'), None): 

124 chain_data['match'] = { 'ref': None, 'map': None, 'score': 0 } 

125 protein_parsed_chains.append(chain_data) 

126 # If there are no protein sequences then there is no need to map anything 

127 if len(protein_parsed_chains) == 0: 

128 print(' There are no protein sequences') 

129 return protein_parsed_chains 

130 # For each input forced reference, get the reference sequence 

131 reference_sequences = {} 

132 # Save already tried alignments to not repeat the alignment further 

133 tried_alignments = { chain_data['name']: [] for chain_data in protein_parsed_chains } 

134 # Set a function to try to match all protein sequences with the available reference sequences 

135 # In case of match, objects in the 'protein_parsed_chains' list are modified by adding the result 

136 # Finally, return True if all protein sequences were matched with the available reference sequences or False if not 

137 def match_sequences () -> bool: 

138 # Track each chain-reference alignment match and keep the score of successful alignments 

139 # Now for each structure sequence, align all reference sequences and keep the best alignment (if it meets the minimum) 

140 for chain_data in protein_parsed_chains: 

141 chain = chain_data['name'] 

142 chain_tried_alignments = tried_alignments[chain] 

143 # In case references are forced per chain check if there is a reference for this chain and match according to this 

144 if strict_references: 

145 # Get the forced specific chain for this sequence, if any 

146 forced_reference = input_protein_references.get(chain, None) 

147 if forced_reference: 

148 # If the chain has a specific forced reference then we must align it just once 

149 # Skip this process in further matches 

150 if chain_data['match']['ref']: 

151 continue 

152 # In case the forced reference is the "no referable" flag 

153 # Thus it has no reference sequence and we must not try to match it 

154 # Actually, any match would be accidental and not correct 

155 if forced_reference == NO_REFERABLE_FLAG: 

156 chain_data['match'] = { 'ref': NO_REFERABLE_FLAG } 

157 continue 

158 # In case the forced reference is the "not found" flag 

159 # This should not happend but we may be using references as forced references, so just in case 

160 if forced_reference == NOT_FOUND_FLAG: 

161 chain_data['match'] = { 'ref': NOT_FOUND_FLAG } 

162 continue 

163 # Get the forced reference sequence and align it to the chain sequence in order to build the map 

164 reference_sequence = reference_sequences[forced_reference] 

165 print(f' Aligning chain {chain} with {forced_reference} reference sequence') 

166 align_results = align(reference_sequence, chain_data['sequence']) 

167 # The align must match or we stop here and warn the user 

168 if not align_results: 

169 raise SystemExit(f'Forced reference {chain} -> {forced_reference} does not match in sequence') 

170 sequence_map, align_score = align_results 

171 reference = references[forced_reference] 

172 chain_data['match'] = { 'ref': reference, 'map': sequence_map, 'score': align_score } 

173 continue 

174 # If the chain has already a match and this match is among the forced references then stop here 

175 # Forced references have priority and this avoids having a match with a not forced reference further 

176 if chain_data['match']['ref'] and chain_data['match']['ref']['uniprot'] in input_protein_references: 

177 continue 

178 # Iterate over the different available reference sequences 

179 for uniprot_id, reference_sequence in reference_sequences.items(): 

180 # If this alignment has been tried already then skip it 

181 if uniprot_id in chain_tried_alignments: 

182 continue 

183 # Align the structure sequence with the reference sequence 

184 # NEVER FORGET: This system relies on the fact that topology chains are not repeated 

185 print(f' Aligning chain {chain} with {uniprot_id} reference sequence') 

186 align_results = align(reference_sequence, chain_data['sequence']) 

187 tried_alignments[chain].append(uniprot_id) # Save the alignment try, no matter if it works or not 

188 if not align_results: 

189 continue 

190 # In case we have a valid alignment, check the alignment score is better than the current reference score (if any) 

191 sequence_map, align_score = align_results 

192 current_reference = chain_data['match'] 

193 if current_reference['score'] > align_score: 

194 continue 

195 # If the match is a 'no referable' exception then set a no referable flag 

196 if type(uniprot_id) == NoReferableException: 

197 chain_data['match'] = { 'ref': NO_REFERABLE_FLAG } 

198 continue 

199 # Proceed to set the corresponding reference toherwise 

200 reference = references[uniprot_id] 

201 # If the alignment is better then we impose the new reference 

202 chain_data['match'] = { 'ref': reference, 'map': sequence_map, 'score': align_score } 

203 # Sum up the current matching 

204 print(' Protein reference summary:') 

205 for chain_data in parsed_chains: 

206 name = chain_data['name'] 

207 match = chain_data.get('match', None) 

208 if not match: 

209 print(f' {name} -> Not protein') 

210 continue 

211 reference = chain_data['match'].get('ref', None) 

212 if not reference: 

213 print(f' {name} -> ¿?') 

214 continue 

215 if reference == NO_REFERABLE_FLAG: 

216 print(f' {name} -> No referable') 

217 continue 

218 if reference == NOT_FOUND_FLAG: 

219 print(f' {name} -> Not found') 

220 continue 

221 uniprot_id = reference['uniprot'] 

222 print(f' {name} -> {uniprot_id}') 

223 # Export already matched references 

224 export_references(protein_parsed_chains, protein_references_file) 

225 # Finally, return True if all protein sequences were matched with the available reference sequences or False if not 

226 allright = all([ chain_data['match']['ref'] for chain_data in protein_parsed_chains ]) 

227 # If we match all chains then make sure there is no forced reference missing which did not match 

228 # Otherwise stop here and force the user to remove these forced uniprot ids from the inputs file 

229 # WARNING: Although we could move on it is better to stop here and warn the user to prevent a future silent problem 

230 if allright and input_protein_references: 

231 # Get forced uniprot ids 

232 forced_uniprot_ids = set(list(input_protein_references.values()) if strict_references else input_protein_references) 

233 forced_uniprot_ids -= { NOT_FOUND_FLAG, NO_REFERABLE_FLAG } 

234 #forced_uniprot_ids.remove(NO_REFERABLE_FLAG) 

235 #forced_uniprot_ids.remove(NOT_FOUND_FLAG) 

236 # Get matched uniprot ids 

237 matched_references = [ chain_data['match']['ref'] for chain_data in protein_parsed_chains ] 

238 matched_uniprot_ids = set([ ref['uniprot'] for ref in matched_references if type(ref) == dict ]) 

239 # Check the difference 

240 unmatched_uniprot_ids = forced_uniprot_ids - matched_uniprot_ids 

241 if len(unmatched_uniprot_ids) > 0: 

242 log = ', '.join(unmatched_uniprot_ids) 

243 raise InputError(f'Some forced references were not matched with any protein sequence: {log}\n' 

244 ' Please remove them from the inputs file') 

245 return allright 

246 # --- End of match_sequences function -------------------------------------------------------------------------------- 

247 # First use the forced references for the matching 

248 if input_protein_references: 

249 forced_uniprot_ids = list(input_protein_references.values()) if strict_references else input_protein_references 

250 for uniprot_id in forced_uniprot_ids: 

251 # If instead of a uniprot id there is a 'no referable' flag then we skip this process 

252 if uniprot_id == NO_REFERABLE_FLAG: 

253 continue 

254 # If instead of a uniprot id there is a 'not found' flag then we skip this process 

255 # This should not happend but we may be using references as forced references, so just in case 

256 if uniprot_id == NOT_FOUND_FLAG: 

257 continue 

258 # If reference is already in the list (i.e. it has been imported) then skip this process 

259 reference = references.get(uniprot_id, None) 

260 if reference: 

261 reference_sequences[uniprot_id] = reference['sequence'] 

262 continue 

263 # Find the reference data for the given uniprot id 

264 reference, already_loaded = get_reference(uniprot_id) 

265 reference_sequences[uniprot_id] = reference['sequence'] 

266 # Save the current whole reference object for later 

267 references[reference['uniprot']] = reference 

268 # Now that we have all forced references data perform the matching 

269 # If we have every protein chain matched with a reference then we stop here 

270 print(' Using only forced references from the inputs file') 

271 if match_sequences(): 

272 return protein_parsed_chains 

273 # Now add the imported references to reference sequences. Thus now they will be 'matchable' 

274 # Thus now they will be 'matchable', so try to match sequences again in case any of the imported references has not been tried 

275 # It was not done before since we want forced references to have priority 

276 if imported_references: 

277 need_rematch = False 

278 for uniprot_id, reference in imported_references.items(): 

279 # If the imported reference has been aligned already (i.e. it was a forced reference) 

280 if uniprot_id in reference_sequences: 

281 continue 

282 # Otherwise, include it 

283 need_rematch = True 

284 reference_sequences[uniprot_id] = reference['sequence'] 

285 # If there was at least one imported reference missing then rerun the matching 

286 if need_rematch: 

287 print(' Using also references imported from references.json') 

288 if match_sequences(): 

289 return protein_parsed_chains 

290 # If there are still any chain which is not matched with a reference then we need more references 

291 # To get them, retrieve all uniprot ids associated to the pdb ids, if any 

292 if pdb_ids and len(pdb_ids) > 0: 

293 for pdb_id in pdb_ids: 

294 # Ask PDB 

295 uniprot_ids = pdb_to_uniprot(pdb_id) 

296 for uniprot_id in uniprot_ids: 

297 # If this is not an actual UniProt, but a no referable exception, then handle it 

298 # We must find the matching sequence and set the corresponding chain as no referable 

299 if type(uniprot_id) == NoReferableException: 

300 reference_sequences[uniprot_id] = uniprot_id.sequence 

301 continue 

302 # Build a new reference from the resulting uniprot 

303 reference, already_loaded = get_reference(uniprot_id) 

304 if reference == None: 

305 continue 

306 reference_sequences[reference['uniprot']] = reference['sequence'] 

307 # Save the current whole reference object for later 

308 references[reference['uniprot']] = reference 

309 # If we have every protein chain matched with a reference already then we stop here 

310 print(' Using also references related to PDB ids from the inputs file') 

311 if match_sequences(): 

312 return protein_parsed_chains 

313 # If there are still any chain which is not matched with a reference then we need more references 

314 # To get them, we run a blast with each orphan chain sequence 

315 for chain_data in protein_parsed_chains: 

316 # Skip already references chains 

317 if chain_data['match']['ref']: 

318 continue 

319 # Run the blast 

320 sequence = chain_data['sequence'] 

321 uniprot_id = blast(sequence) 

322 if not uniprot_id: 

323 chain_data['match'] = { 'ref': NOT_FOUND_FLAG } 

324 continue 

325 # Build a new reference from the resulting uniprot 

326 reference, already_loaded = get_reference(uniprot_id) 

327 reference_sequences[reference['uniprot']] = reference['sequence'] 

328 # Save the current whole reference object for later 

329 references[reference['uniprot']] = reference 

330 # If we have every protein chain matched with a reference already then we stop here 

331 print(' Using also references from blast') 

332 if match_sequences(): 

333 return protein_parsed_chains 

334 # At this point we should have macthed all sequences 

335 # If not, kill the process unless mercy was given 

336 must_be_killed = REFERENCE_SEQUENCE_FLAG not in mercy 

337 if must_be_killed: 

338 raise SystemExit('BLAST failed to find a matching reference sequence for at least one protein sequence') 

339 warn('BLAST failed to find a matching reference sequence for at least one protein sequence') 

340 register.add_warning(REFERENCE_SEQUENCE_FLAG, 'There is at least one protein region which is not mapped to any reference sequence') 

341 return protein_parsed_chains 

342 

343# Export reference objects data to a json file 

344# This file is used by the loader to load new references to the database 

345# Note that all references are saved to this json file, even those which are already in the database 

346# It is the loader who is the responsible to check which references must be loaded and which ones are loaded already 

347# Note that mapping data (i.e. which residue belongs to each reference) is not saved 

348def export_references (mapping_data : list, protein_references_file : 'File'): 

349 final_references = [] 

350 final_uniprots = [] 

351 for data in mapping_data: 

352 match = data['match'] 

353 ref = match['ref'] 

354 if not ref or ref == NO_REFERABLE_FLAG or ref == NOT_FOUND_FLAG: 

355 continue 

356 uniprot = ref['uniprot'] 

357 if uniprot in final_uniprots: 

358 continue 

359 final_references.append(ref) 

360 final_uniprots.append(uniprot) 

361 # If there are no references to exported then do not egenrate the json file 

362 if len(final_references) == 0: 

363 return 

364 # Write references to a json file 

365 save_json(final_references, protein_references_file.path, indent = 4) 

366 

367# Import reference json file so we do not have to rerun this process 

368def import_references (protein_references_file : 'File') -> list: 

369 print(' Importing references from ' + protein_references_file.path) 

370 # Read the file 

371 file_content = load_json(protein_references_file.path) 

372 # Format data as the process expects to find it 

373 references = {} 

374 for reference in file_content: 

375 uniprot = reference['uniprot'] 

376 references[uniprot] = reference 

377 return references 

378 

379# Get each chain name and aminoacids sequence in a topology 

380# Output format example: [ { 'sequence': 'VNLTT', 'indices': [1, 2, 3, 4, 5] }, ... ] 

381def get_parsed_chains (structure : 'Structure') -> list: 

382 parsed_chains = [] 

383 chains = structure.chains 

384 for chain in chains: 

385 name = chain.name 

386 sequence = '' 

387 residue_indices = [] 

388 for residue in chain.residues: 

389 letter = protein_residue_name_to_letter(residue.name) 

390 sequence += letter 

391 residue_indices.append(residue.index) 

392 sequence_object = { 'name': name, 'sequence': sequence, 'residue_indices': residue_indices } 

393 parsed_chains.append(sequence_object) 

394 return parsed_chains 

395 

396# Align two aminoacid sequences 

397# Return a list with the reference residue indexes (values) 

398# which match each new sequence residues indexes (indexes) 

399# Return also the score of the alignment 

400# Return None when there is not valid alignment at all 

401# Set verbose = True to see a visual summary of the sequence alignments in the logs 

402def align (ref_sequence : str, new_sequence : str, verbose : bool = False) -> Optional[ Tuple[list, float] ]: 

403 

404 #print('- REFERENCE\n' + ref_sequence + '\n- NEW\n' + new_sequence) 

405 

406 # If the new sequence is all 'X' stop here, since this would make the alignment infinite 

407 # Then an array filled with None is returned 

408 if all([ letter == 'X' for letter in new_sequence ]): 

409 return None 

410 

411 # Return the new sequence as best aligned as possible with the reference sequence 

412 #alignments = pairwise2.align.localds(ref_sequence, new_sequence, MatrixInfo.blosum62, -10, -0.5) 

413 # DANI: Habría que hacerlo de esta otra forma según el deprecation warning (arriba hay más código) 

414 # DANI: El problema es que el output lo tiene todo menos la sequencia en formato alienada 

415 # DANI: i.e. formato '----VNLTT' (con los gaps identificados con guiones), que es justo el que necesito 

416 # Actualized code for biopython >= 1.80 

417 alignments = aligner.align(ref_sequence, new_sequence) 

418 

419 # In case there are no alignments it means the current chain has nothing to do with this reference 

420 # Then an array filled with None is returned 

421 if len(alignments) == 0: 

422 return None 

423 

424 # Several alignments may be returned, specially when it is a difficult or impossible alignment 

425 # Output format example: '----VNLTT' 

426 best_alignment = alignments[0] 

427 best_alignment_with_gaps = add_leading_and_trailing_gaps(best_alignment) 

428 # Extract the allignment sequences 

429 #aligned_ref, aligned_sequence = best_alignment_with_gaps.sequences 

430 aligned_ref = best_alignment_with_gaps[0] # AGUS: Puede ser útil en el futuro 

431 aligned_sequence = best_alignment_with_gaps[1] 

432 # Obtain the score of the alignment 

433 score = best_alignment.score 

434 # Calculate the normalized score 

435 normalized_score = score / len(new_sequence) 

436 

437 # If the normalized score does not reaches the minimum we consider the alignment is not valid 

438 # It may happen when the reference goes for a specific chain but we must map all chains 

439 # This 1 has been found experimentally 

440 # Non maching sequence may return a 0.1-0.3 normalized score 

441 # Matching sequence may return >4 normalized score 

442 if normalized_score < 2: 

443 print(' Not valid alignment') 

444 return None 

445 

446 # Tell the user about the success 

447 beautiful_normalized_score = round(normalized_score * 100) / 100 

448 print(f' Valid alignment -> Normalized score = {beautiful_normalized_score}') 

449 

450 # Match each residue 

451 aligned_mapping = [] 

452 aligned_index = 0 

453 for l, letter in enumerate(aligned_sequence): 

454 # Guions are skipped 

455 if letter == '-': 

456 continue 

457 # Get the current residue of the new sequence 

458 equivalent_letter = new_sequence[aligned_index] 

459 if not letter == equivalent_letter: 

460 raise SystemExit(f'Something was wrong at position {l} :S') 

461 # 'X' residues cannot be mapped since reference sequences should never have any 'X' 

462 if letter == 'X': 

463 aligned_mapping.append(None) 

464 aligned_index += 1 

465 continue 

466 # Otherwise add the equivalent aligned index to the mapping 

467 # WARNING: Add +1 since uniprot residue counts start at 1, not 0 

468 aligned_mapping.append(l + 1) 

469 aligned_index += 1 

470 

471 return aligned_mapping, normalized_score 

472 

473# Given an aminoacids sequence, return a list of uniprot ids 

474# Note that we are blasting against UniProtKB / Swiss-Prot so results will always be valid UniProt accessions 

475# WARNING: This always means results will correspond to curated entries only 

476# If your sequence is from an exotic organism the result may be not from it but from other more studied organism 

477def blast (sequence : str) -> Optional[str]: 

478 print('Throwing blast...') 

479 result = NCBIWWW.qblast( 

480 program = "blastp", 

481 database = "swissprot", # UniProtKB / Swiss-Prot 

482 sequence = sequence, 

483 ) 

484 parsed_result = xmltodict.parse(result.read()) 

485 hits = parsed_result['BlastOutput']['BlastOutput_iterations']['Iteration']['Iteration_hits'] 

486 # When there is no result return None 

487 # Note that this is possible although hardly unprobable 

488 if not hits: 

489 return None 

490 # Get the first result only 

491 result = hits['Hit'][0] 

492 # Return the accession 

493 # DANI: Si algun día tienes problemas porque te falta el '.1' al final del accession puedes sacarlo de Hit_id 

494 accession = result['Hit_accession'] 

495 print('Result: ' + accession) 

496 return accession 

497 

498# Given a uniprot accession, use the MDposit API to request its data in case it is already in the database 

499# If the reference is not found or there is any error then return None and keep going, we do not really need this 

500def get_mdposit_reference (uniprot_accession : str, database_url : str) -> Optional[dict]: 

501 # Request MDposit 

502 request_url = f'{database_url}rest/v1/references/{uniprot_accession}' 

503 try: 

504 with urllib.request.urlopen(request_url) as response: 

505 parsed_response = json.loads(response.read().decode("utf-8")) 

506 # If the accession is not found in the database then we stop here 

507 except urllib.error.HTTPError as error: 

508 # If the uniprot accession is not yet in the MDposit references then return None 

509 if error.code == 404: 

510 return None 

511 else: 

512 warn(f'Something went wrong with the MDposit request {request_url} (error {error.code})') 

513 warn(f'Protein reference {uniprot_accession} will be made again even if it exists') 

514 return None 

515 # This error may occur if there is no internet connection 

516 except urllib.error.URLError as error: 

517 # The error variable as is do not work properly 

518 # Its 'errno' value is None (at least for tiemout errors) 

519 actual_error = error.args[0] 

520 if actual_error.errno == 110: 

521 print('Timeout error when requesting MDposit. Is the node fallen?') 

522 warn('Something went wrong with the MDposit request ' + request_url) 

523 warn(f'Protein reference {uniprot_accession} will be made again even if it exists') 

524 return None 

525 return parsed_response 

526 

527# Given a uniprot accession, use the uniprot API to request its data and then mine what is needed for the database 

528def get_uniprot_reference (uniprot_accession : str) -> Optional[dict]: 

529 # Request Uniprot 

530 request_url = 'https://www.ebi.ac.uk/proteins/api/proteins/' + uniprot_accession 

531 parsed_response = None 

532 try: 

533 with urllib.request.urlopen(request_url) as response: 

534 parsed_response = json.loads(response.read().decode("utf-8")) 

535 # If the accession is not found in UniProt then the id is not valid 

536 except urllib.error.HTTPError as error: 

537 if error.code == 404: 

538 print('WARNING: Cannot find UniProt entry for accession ' + uniprot_accession) 

539 return None 

540 print('Error when requesting ' + request_url) 

541 raise ValueError(f'Something went wrong with the Uniprot request (error {error.code})') 

542 # If we have not a response at this point then it may mean we are trying to access an obsolete entry (e.g. P01607) 

543 if parsed_response == None: 

544 print('WARNING: Cannot find UniProt entry for accession ' + uniprot_accession) 

545 return None 

546 # Get the full protein name 

547 protein_data = parsed_response['protein'] 

548 protein_name_data = protein_data.get('recommendedName', None) 

549 # DANI: It is possible that the 'recommendedName' is missing if it is not a reviewed UniProt entry 

550 if not protein_name_data: 

551 warn(f'The UniProt accession {uniprot_accession} is missing the recommended name. You should consider changing the reference.') 

552 protein_name_data = protein_data.get('submittedName', None)[0] 

553 if not protein_name_data: 

554 raise ValueError('Unexpected structure in UniProt response for accession ' + uniprot_accession) 

555 protein_name = protein_name_data['fullName']['value'] 

556 # Get the gene names as a single string 

557 gene_names = [] 

558 # WARNING: Some uniprot entries are missing gene names (e.g. P00718) 

559 # WARNING: Some uniprot entries have names in non-canonical keys (e.g. orfNames, olnNames) 

560 # These names are not shown event in the uniprot web page so we also skip them 

561 genes = parsed_response.get('gene', []) 

562 for gene in genes: 

563 gene_name = gene.get('name', None) 

564 if not gene_name: 

565 continue 

566 gene_names.append(gene_name['value']) 

567 gene_names = ', '.join(gene_names) if len(gene_names) > 0 else None 

568 # Get the organism name 

569 organism = parsed_response['organism']['names'][0]['value'] 

570 # Get the aminoacids sequence 

571 sequence = parsed_response['sequence']['sequence'] 

572 # Get interesting regions to be highlighted in the client 

573 domains = [] 

574 # WARNING: Some uniprot entries are missing features (e.g. O27908) 

575 features = parsed_response.get('features', []) 

576 # Get comments data which may be useful to further build domain descriptions 

577 comments_data = parsed_response.get('comments', None) 

578 # There are many types of features but in this case we will focus on domanins and similar 

579 target_types = ['CHAIN', 'REGION', 'DOMAIN', 'MOTIF', 'SITE'] 

580 for feature in features: 

581 # Skip features of other types 

582 if feature['type'] not in target_types: 

583 continue 

584 # Get the domain name 

585 name = feature['description'] 

586 # Build the domain description from the coments data 

587 description = None 

588 if comments_data: 

589 comments = [ comment for comment in parsed_response['comments'] if name == comment.get('molecule', None) ] 

590 comment_text = [] 

591 for comment in comments: 

592 if not comment.get('text', False): continue 

593 text = comment.get('text', None) 

594 if text == None: raise ValueError('Unexpected UniProt response format: no text in comment') 

595 # DANI: el comment 'text' casi siempre es una lista 

596 # DANI: solo tengo constancia de una vez en que era un string directamente 

597 # DANI: en uno de los comentarios de https://www.ebi.ac.uk/proteins/api/proteins/Q15465 

598 if type(text) == str: comment_text.append(text) 

599 elif type(text) == list: comment_text.append(text[0]['value']) 

600 else: raise ValueError('Unexpected UniProt response format: text in comment is neither str or list') 

601 description = '\n\n'.join(comment_text) 

602 # Set the domain selection 

603 # The domain 'begin' and 'end' values may include non-numeric symbols such as '~', '>' or '<' 

604 # These values are usually ignored or replaced by '?' in the UniProt web client 

605 # There may be not numeric value at all (e.g. Q99497) 

606 # In these cases uniprot shows the domain but it does not allow to use its functionallities  

607 # e.g. you can not blast using the domain sequence 

608 # It makes not sense having a domain with unkown range to me so we skip these domains 

609 begin = feature['begin'].replace('~', '').replace('>', '').replace('<', '') 

610 end = feature['end'].replace('~', '').replace('>', '').replace('<', '') 

611 if begin == '' or end == '': 

612 continue 

613 selection = begin + '-' + end 

614 # If we already have a domain with the same name then join both domains 

615 # For instance, you may have several repetitions of the 'Disordered' region 

616 already_existing_domain = next((domain for domain in domains if domain['name'] == name), None) 

617 if already_existing_domain: 

618 already_existing_domain['selection'] += ', ' + selection 

619 continue 

620 # Otherwise, create a new domain 

621 domain = { 

622 'name': name, 

623 'selection': selection 

624 } 

625 # Add a description only if we succesfully mined it 

626 # Note that only features tagged as CHAIN have comments (not sure about this) 

627 if description: 

628 domain['description'] = description 

629 # Add the new domain to the list 

630 domains.append(domain) 

631 # Mine protein functions from Gene Ontology references 

632 # Get database references 

633 db_references = parsed_response['dbReferences'] 

634 # Get references from Gene Ontology (GO) only 

635 go_references = [ ref for ref in db_references if ref['type'] == 'GO' ] 

636 # A Gene Ontology entry may be one of three types: 

637 # Cellular Component (C), Molecular Function (F) and Biological Process (P) 

638 # In this case we are interested in protein function only so we will keep those with the 'F' header 

639 functions = [ ref['properties']['term'][2:] for ref in go_references if ref['properties']['term'][0:2] == 'F:' ] 

640 # Set the final reference to be uploaded to the database 

641 return { 

642 'name': protein_name, 

643 'gene': gene_names, 

644 'organism': organism, 

645 'uniprot': uniprot_accession, 

646 'sequence': sequence, 

647 'domains': domains, 

648 'functions': functions 

649 } 

650 

651# Given a pdb Id, get its uniprot id 

652# e.g. 6VW1 -> Q9BYF1, P0DTC2, P59594 

653def pdb_to_uniprot (pdb_id : str) -> List[ Union[str, NoReferableException] ]: 

654 # Set the request query 

655 query = '''query ($id: String!) { 

656 entry(entry_id: $id) { 

657 polymer_entities { 

658 rcsb_polymer_entity_container_identifiers { uniprot_ids } 

659 rcsb_entity_source_organism { scientific_name } 

660 entity_poly { pdbx_seq_one_letter_code } 

661 } 

662 } 

663 }''' 

664 # Request PDB data 

665 parsed_response = request_pdb_data(pdb_id, query) 

666 # Mine data 

667 uniprot_ids = [] 

668 for polymer in parsed_response['polymer_entities']: 

669 identifier = polymer['rcsb_polymer_entity_container_identifiers'] 

670 # Get the uniprot 

671 uniprots = identifier.get('uniprot_ids', None) 

672 # If there are not UniProts at all in this entity skip to the next 

673 if not uniprots: 

674 # However, it may happen the the polymer organism is set as synthetic construct 

675 # In this case we keep the sequence and return a no referable exception 

676 organisms = polymer.get('rcsb_entity_source_organism', None) 

677 if not organisms: raise ValueError('Missing organism') 

678 # Get scientific names in lower caps since sometimes they are all upper caps 

679 scientific_names = set( 

680 [ organism.get('scientific_name', '').lower() for organism in organisms ]) 

681 if 'synthetic construct' in scientific_names: 

682 entity = polymer.get('entity_poly', None) 

683 if not entity: raise ValueError(f'Missing entity in {pdb_id}') 

684 sequence = entity.get('pdbx_seq_one_letter_code', None) 

685 if not sequence: raise ValueError(f'Missing sequence in {pdb_id}') 

686 uniprot_ids.append( NoReferableException(sequence) ) 

687 continue 

688 # If we have multiple uniprots in a single entity then we skip them 

689 # Note tha they belong to an entity which is no referable (e.g. a chimeric entity) 

690 # See 5GY2 and 7E2Z labeled as chimeric entities and 6e67, which is not labeled likewise 

691 if len(uniprots) > 1: 

692 warn(f'Multiple UniProt ids in the same entity in {pdb_id} -> Is this a chimeric entity?') 

693 entity = polymer.get('entity_poly', None) 

694 if not entity: raise ValueError(f'Missing entity in {pdb_id}') 

695 sequence = entity.get('pdbx_seq_one_letter_code', None) 

696 if not sequence: raise ValueError(f'Missing sequence in {pdb_id}') 

697 uniprot_ids.append( NoReferableException(sequence) ) 

698 continue 

699 uniprot_id = uniprots[0] 

700 uniprot_ids.append(uniprot_id) 

701 actual_uniprot_ids = [ uniprot_id for uniprot_id in uniprot_ids if type(uniprot_id) == str ] 

702 if len(actual_uniprot_ids) > 0: 

703 print(f' UniProt ids for PDB id {pdb_id}: ' + ', '.join(actual_uniprot_ids)) 

704 else: print(f' No UniProt ids were found for PDB id {pdb_id}') 

705 no_refs = [ uniprot_id for uniprot_id in uniprot_ids if type(uniprot_id) == NoReferableException ] 

706 no_refs_count = len(no_refs) 

707 if no_refs_count > 0: 

708 print(f' Also encountered {no_refs_count} no refereable sequences for PDB id {pdb_id}') 

709 return uniprot_ids 

710 

711# This function is used by the generate_metadata script 

712# 1. Get structure sequences 

713# 2. Calculate which reference domains are covered by the previous sequence 

714# 3. In case it is a covid spike, align the previous sequence against all saved variants (they are in 'utils') 

715from model_workflow.resources.covid_variants import covid_spike_variants 

716def get_sequence_metadata (structure : 'Structure', protein_references_file : 'File', residue_map : dict) -> dict: 

717 # Mine sequences from the structure 

718 sequences = [] 

719 # Classify sequences according to if they belong to protein or nucleic sequences 

720 # WARNING: We are interested in unique sequence BUT we also want to keep the order 

721 # WARNING: Do NOT use sets here to the order of appearance in the structure is respected 

722 protein_sequences = [] 

723 nucleic_sequences = [] 

724 # Iterate structure chains 

725 for chain in structure.chains: 

726 # Get the current chain sequence and add it to the list 

727 sequence = chain.get_sequence() 

728 sequences.append(sequence) 

729 # Get the chain classification 

730 classification = chain.get_classification() 

731 # Depending on what it is, add the sequence also in the corresponding list 

732 if classification == 'protein' and sequence not in protein_sequences: 

733 protein_sequences.append(sequence) 

734 elif (classification == 'dna' or classification == 'rna') and sequence not in nucleic_sequences: 

735 nucleic_sequences.append(sequence) 

736 # Get values from the residue map 

737 # Get protein references from the residues map 

738 reference_ids = [] 

739 references = residue_map['references'] 

740 if references and len(references) > 0: 

741 for ref, ref_type in zip(references, residue_map['reference_types']): 

742 if ref_type == 'protein': 

743 reference_ids.append(ref) 

744 residue_reference_numbers = residue_map['residue_reference_numbers'] 

745 residue_reference_indices = residue_map['residue_reference_indices'] 

746 # Load references data, which should already be save to the references data file 

747 references_data = import_references(protein_references_file) if protein_references_file.exists else {} 

748 # In case we have the SARS-CoV-2 spike among the references, check also which is the variant it belongs to 

749 # DANI: Esto es un parche. En un futuro buscaremos una manera mejor de comprovar variantes en cualquier contexto 

750 variant = None 

751 covid_spike_reference_id = 'P0DTC2' 

752 if covid_spike_reference_id in reference_ids: 

753 # Load the reference data and find a chain which belongs to the spike 

754 # We consider having only one variant, so all chains should return the same result 

755 reference_data = references_data[covid_spike_reference_id] 

756 covid_spike_reference_index = reference_ids.index(covid_spike_reference_id) 

757 sample_residue_index = next((residue_index for residue_index, reference_index in enumerate(residue_reference_indices) if reference_index == covid_spike_reference_index), None) 

758 if sample_residue_index == None: 

759 raise SystemExit('Failed to find residue belonging to SARS-CoV-2 variant') 

760 sample_chain_sequence = structure.residues[sample_residue_index].chain.get_sequence() 

761 # Align the sample chain sequence against all variants to find the best match 

762 highest_score = 0 

763 print('Finding SARS-CoV-2 variant') 

764 for variant_name, variant_sequence in covid_spike_variants.items(): 

765 print(f' Trying with {variant_name}') 

766 align_results = align(variant_sequence, sample_chain_sequence) 

767 if not align_results: 

768 continue 

769 mapping, score = align_results 

770 if score > highest_score: 

771 highest_score = score 

772 variant = variant_name 

773 # At this point there should be a result 

774 if not variant: 

775 raise SystemExit('Something went wrong trying to find the SARS-CoV-2 variant') 

776 print(f'It is {variant}') 

777 # Set which reference domains are present in the structure 

778 domains = [] 

779 for reference_index, reference_id in enumerate(reference_ids): 

780 # If this is not referable then there are no domains to mine 

781 if reference_id == NO_REFERABLE_FLAG: 

782 continue 

783 # If the reference was not found then there are no domains to mine 

784 if reference_id == NOT_FOUND_FLAG: 

785 continue 

786 # Get residue numbers of residues which belong to the current reference in th residue map 

787 residue_numbers = [] 

788 for residue_index, residue_reference_index in enumerate(residue_reference_indices): 

789 if residue_reference_index == reference_index: 

790 residue_numbers.append(residue_reference_numbers[residue_index]) 

791 # Set a function to find if any of the residue numbers is within a given range 

792 def in_range (start : int, end : int) -> bool: 

793 for residue_number in residue_numbers: 

794 if residue_number >= start and residue_number <= end: 

795 return True 

796 return False 

797 # Get the reference data for the current reference uniprot id 

798 reference_data = references_data.get(reference_id, None) 

799 if not reference_data: 

800 raise SystemExit(reference_id + ' is not in the references data file') 

801 # Iterate over data domains 

802 for domain in reference_data['domains']: 

803 selection = domain['selection'] 

804 # Iterate over the residue ranges in the selection field 

805 residue_ranges = selection.split(', ') 

806 for residue_range in residue_ranges: 

807 start, end = [ int(num) for num in residue_range.split('-') ] 

808 # If this range includes any resiudes in the residue map then the domain is included in the list 

809 if in_range(start, end): 

810 domains.append(domain['name']) 

811 break 

812 # Return the sequence matadata 

813 return { 

814 'sequences': sequences, 

815 'protein_sequences': protein_sequences, 

816 'nucleic_sequences': nucleic_sequences, 

817 'domains': domains, 

818 'cv19_variant': variant 

819 }