Coverage for model_workflow/analyses/helical_parameters.py: 89%

472 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 10:54 +0000

1import numpy as np 

2import os 

3from os.path import exists 

4import pandas as pd 

5import math 

6import subprocess 

7from shutil import move 

8import glob 

9from struct import pack 

10import re 

11from warnings import warn 

12from model_workflow.utils.auxiliar import save_json, store_binary_data 

13from model_workflow.utils.constants import OUTPUT_HELICAL_PARAMETERS_FILENAME 

14from model_workflow.utils.type_hints import * 

15 

16from model_workflow.tools.nassa_loaders import load_sequence2 

17from model_workflow.tools.nassa_loaders import load_serfile 

18 

19conda_prefix = os.environ['CONDA_PREFIX'] 

20# If this path does not exist then it means curves is not installed 

21curves_path = conda_prefix + '/.curvesplus' 

22# Note that this is not a file, but the prefix to 3 different files 

23standard_prefix = curves_path + '/standard' 

24 

25# TAKE INTO ACCOUNT THAT EL CODE WAS DEVELOP AND IMPLEMENTED IN THIS WORKFLOW USING AS A TEMPLATE CODE USED IN IRBBARCELONA BIOBB  

26# HERE THERE IS THE LINK RELATED TO THEIR WEBPAGE WITH MORE WORKFLOWS COMPUTING OTHER STUFF 

27# https://mmb.irbbarcelona.org/biobb/workflows 

28 

29# HERE THERE IS THE LINK RELATED TO THE GITHUB WEBPAGE WITH THE PYTHON CODE 

30# https://github.com/bioexcel/biobb_wf_dna_helparms 

31 

32# HERE THERE IS THE LINK RELATED TO THE INFORMATION OF HOW TO EXECUTE CURVES+ AND CANALS AND INTERPRET THEIR OUTPUTS AS WELL AS THE INPUT NEEDED FOR BOTH 

33# http://gbio-pbil.ibcp.fr/tools/curves_plus/canal-user-guide.html 

34 

35# List with the different files names that correspond to single bases 

36hp_singlebases = [ 

37 "shear", 

38 "stagger", 

39 "stretch", 

40 "buckle", 

41 "opening", 

42 "propel", 

43 "inclin", 

44 "tip", 

45 "xdisp", 

46 "ydisp" 

47] 

48 

49# List with the different files names that correspond to base pairs 

50hp_basepairs = [ 

51 "majw", 

52 "majd", 

53 "minw", 

54 "mind", 

55 "rise", 

56 "shift", 

57 "slide", 

58 "roll", 

59 "tilt", 

60 "twist" 

61] 

62 

63# List with the different files names which we will use degrees units 

64hp_angular = [ 

65 "roll", 

66 "tilt", 

67 "twist", 

68 "buckle", 

69 "opening", 

70 "propel", 

71 "inclin", 

72 "tip" 

73] 

74 

75# List with the different files names which we will use amstrongs units 

76hp_translational = [ 

77 "shear", 

78 "stagger", 

79 "stretch", 

80 "xdisp", 

81 "ydisp", 

82 "majw", 

83 "majd", 

84 "minw", 

85 "mind", 

86 "rise", 

87 "shift", 

88 "slide" 

89] 

90 

91# List with the different files names corresponding to Backbone block of Helical Parameters 

92hp_backbone = [ 

93 "alphac", 

94 "alphaw", 

95 "betac", 

96 "betaw", 

97 "gammac", 

98 "gammaw", 

99 "deltac", 

100 "deltaw", 

101 "chic", 

102 "chiw", 

103 "phasec", 

104 "phasew", 

105 "epsilc", 

106 "epsilw", 

107 "zetac", 

108 "zetaw" 

109] 

110 

111helical_parameters = hp_basepairs + hp_singlebases + hp_backbone 

112baselen = 0 

113hp_unit = "" 

114 

115def helical_parameters ( 

116 topology_file : 'File', 

117 trajectory_file : 'File', 

118 structure_file : 'File', 

119 output_directory : str, 

120 structure : 'Structure', 

121 frames_limit : Optional[int] = None, 

122 dna_selection : Optional[str] = None 

123): 

124 """Helical parameters analysis.""" 

125 # Set the main output filepath 

126 output_analysis_filepath = f'{output_directory}/{OUTPUT_HELICAL_PARAMETERS_FILENAME}' 

127 

128 # Save the original path 

129 actual_path = os.getcwd() 

130 

131 # Get a selection from both chains in the B-DNA 

132 selection = None 

133 # In case a selection was provided then use it 

134 if dna_selection: 

135 selection = structure.select(dna_selection, syntax='vmd') 

136 # Otherwise, get all nucleic acids 

137 else: 

138 selection = structure.select_nucleic() 

139 

140 # If there is nothing to analyze then we return here 

141 if not selection: 

142 print(' There are no nucleic acids') 

143 return 

144 

145 # Check curves is installed 

146 if not exists(curves_path): 

147 raise SystemExit(' Cannot find curves path. Is Curves+ installed?') 

148 

149 # Get the sequence from the selected chains 

150 chain_indices = structure.get_selection_chain_indices(selection) 

151 if len(chain_indices) != 2: 

152 raise SystemExit('We have a different number of chains than 2. Is this canonical B-DNA? Make sure every strand has an independent chain') 

153 sequences = [] 

154 residue_index_ranges = [] 

155 for c, chain_index in enumerate(chain_indices): 

156 chain = structure.chains[chain_index] # Obtain Chain of nucleic acids 

157 sequences.append(chain.get_sequence()) # Retrieve its sequence 

158 first_residue_index = chain.residue_indices[0] +1 

159 last_residue_index = chain.residue_indices[-1] +1 

160 residue_index_range = (first_residue_index, last_residue_index) if c == 0 else (last_residue_index, first_residue_index) 

161 residue_index_ranges.append(residue_index_range) 

162 

163 # Run the hydrogen bond analysis to generate .dat file 

164 hydrogen_bonds( 

165 topology_file, 

166 trajectory_file, 

167 output_analysis_filepath, 

168 output_directory, 

169 structure_file, 

170 ) 

171 

172 terminal_execution( 

173 trajectory_file.path, 

174 structure_file.path, 

175 residue_index_ranges, 

176 sequences[0], 

177 output_directory 

178 ) 

179 # Save in a dictionary all the computations done by the different functions called by send_files function 

180 dictionary_information = send_files(sequences[0], frames_limit, output_directory) 

181 # Set the path into the original directory outside the folder helicalparameters 

182 os.chdir(actual_path) 

183 # Convert the dictionary into a json file 

184 # DANI: Estamos teniendo NaNs que no son soportados más adelante, hay que eliminarlos 

185 save_json(dictionary_information, output_analysis_filepath) 

186 

187# Function to execute cpptraj and calculate .dat file (Hydrogen Bonds) 

188def hydrogen_bonds( 

189 topology_file : 'File', 

190 trajectory_file : 'File', 

191 output_analysis_filepath : str, 

192 output_directory : str, 

193 structure_file : 'File', 

194): 

195 # This analysis is done using cpptraj and its done by two steps 

196 # 1. Crate the cpptraj.in file to generate the dry trajectory (cpptraj_dry.inpcrd) 

197 # parm_path = f"/orozco/projects/ABCix/ProductionFiles"  

198 # setup_path = f"/orozco/projects/ABCix/ProductionFiles/SETUP"  

199 

200 # # Config cpptraj 

201 # cpptraj_in_file = f"{output_directory}/cpptraj.in" 

202 # cpptraj_out_file = f"{output_directory}/cpptraj_dry.inpcrd" 

203 

204 # if not os.path.exists(cpptraj_out_file): 

205 

206 

207 # # Exec cpptraj 

208 # cpptraj_command = f"cpptraj {cpptraj_in_file}"  

209 # res = subprocess.run(cpptraj_command, shell=True) 

210 

211 

212 ################ 

213 # 2. From the dry.inpcrd file we will generate the .dat file 

214 # Create the helical parameters folder 

215 # If the folder already exists dont create it again 

216 output_dat_file = os.path.join(output_directory, "mdf.nahbonds.dat") 

217 

218 print(f"Saving .dat file to: {output_dat_file}") 

219 

220 # Select reference structure 

221 # AGUS: para el proyecto ABCix utilizamos los inpcrd de la simulación como referencia porque al utilizar nuestro pdb daba muchos errores 

222 inpcrd_file = glob.glob(os.path.join(output_directory, "*.inpcrd")) 

223 if len(inpcrd_file) == 0: 

224 warn(f"No inpcrd file found in the helical_parameters folder '{output_directory}', executing cpptraj with structure PDB.") 

225 structure_reference = structure_file.path 

226 else: 

227 structure_reference = inpcrd_file[0] # Select the first matching file 

228 

229 # Crear el contenido del input de cpptraj usando las variables 

230 cpptraj_script = f""" 

231 # Load the topology file 

232 parm {topology_file.path} 

233 

234 # Load the initial reference structure 

235 trajin {structure_reference} 

236 

237 # Load the trajectory 

238 trajin {trajectory_file.path} 

239 

240 # Run the NA structure analysis 

241 nastruct NA 

242 

243 # Run the command to process the trajectory 

244 run 

245 

246 # Write out hydrogen bond information to a file 

247 writedata {output_dat_file} NA[hb] 

248 

249 # Quit 

250 quit 

251 """ 

252 print("Executing cpptraj...") 

253 # Execute cpptraj with the generated script 

254 subprocess.run(["cpptraj"], input=cpptraj_script, text=True, check=True) 

255 

256 # Check if the output file was created 

257 if not os.path.exists(output_dat_file): 

258 raise SystemExit(f"Error: {output_dat_file} was not created. Check the cpptraj command and input files.") 

259 

260 # Renumber the dat file because the first line is repeated 

261 # AGUS: con esto tuvimos muchos problemas en el proyecto de ABCix y al final optamos por eliminar la primera línea y renumerar todo  

262 # AGUS: además, también sirvió para comprimir el archivo y leerlo mejor 

263 def renumber_dat_file(file_path): 

264 with open(file_path, 'r') as file: 

265 lines = file.readlines() 

266 

267 counter = 1 

268 first = True 

269 

270 renumbered_lines = [] 

271 for line in lines: 

272 if line.strip().startswith('#'): 

273 renumbered_lines.append(line) 

274 elif first: 

275 first = False 

276 continue 

277 else: 

278 renumbered_lines.append(f"{counter} {' '.join(map(str, line.split()[1:]))}\n") 

279 counter += 1 

280 

281 with open(file_path, 'w') as file: 

282 file.writelines(renumbered_lines) 

283 print("Renumbering the dat file...") 

284 # Renumber the dat file 

285 renumber_dat_file(output_dat_file) 

286 

287 # Set the function to compress the dat file to .bin file 

288 def compress_dat_to_bin(output_dat_file): 

289 data = [] 

290 with open(output_dat_file, 'r') as file: 

291 # Identify which columns we must mine using the headers 

292 headers = next(file).strip().split() 

293 column_order = [] 

294 n_from = 1 

295 n_to = 40 

296 for h, header in enumerate(headers): 

297 target = re.compile(f'{n_from}[ACGT]{n_to}[ACGT]') 

298 if not target.match(header): continue 

299 column_order.append(h) 

300 n_from += 1 

301 n_to -= 1 

302 # Now iterate the rest of lines which include the actual values 

303 # Initialize the number of frames to count all of them 

304 n_frames = 0 

305 for line in file: 

306 values = line.strip().split() 

307 for c in column_order: data.append(int(values[c])) 

308 n_frames += 1 

309 

310 # Set the name of the binary dat file 

311 bin_file_path = output_dat_file.replace('.dat', '.bin') 

312 

313 # In this case we store a hydrogen bond value every 2 bits 

314 bit_size = 2 

315 

316 # Start writting the output file 

317 with open(bin_file_path, 'wb') as file: 

318 bit_count = 0 

319 current_byte = '' 

320 # Iterate over data list values 

321 for value in data: 

322 # Parse the value to binary and make sure the binary is as long as the bit size 

323 bits = format(value, 'b').zfill(bit_size) 

324 if len(bits) != bit_size: 

325 raise ValueError(f'Value {value} cannot be stored in {bit_size} bits') 

326 # Add bits one by one to the current byte to be written 

327 for bit in bits: 

328 current_byte += bit 

329 bit_count += 1 

330 # If the current byte is full then write it to the output file 

331 if bit_count == 8: 

332 #print(current_byte + ' -> ' + str(int(current_byte, 2))) 

333 file.write(pack('<B', int(current_byte, 2))) 

334 current_byte = '' 

335 bit_count = 0 

336 # If last byte is truncated then fill it with 0s and write it 

337 if bit_count != 0: 

338 last_byte = current_byte.ljust(8, '0') 

339 file.write(pack('<B', int(last_byte, 2))) 

340 

341 # Set the name of the meta file and create the meta data corresponding to the binary dat file 

342 meta_file_path = bin_file_path + '.meta.json' 

343 meta_data = { 

344 'x': { 

345 'name': 'bases', 

346 'length': 20 

347 }, 

348 'y': { 

349 'name': 'frames', 

350 'length': n_frames 

351 }, 

352 'bitsize': 2, 

353 } 

354 save_json(meta_data, meta_file_path) 

355 

356 # Compress the dat file to .bin file 

357 compress_dat_to_bin(output_dat_file) 

358 print("Dat file compressed to bin file, removing original dat file...") 

359 # Remove the original dat file 

360 #os.remove(output_dat_file) 

361 

362def terminal_execution( 

363 trajectory_filepath: 'str', 

364 topology_filepath: 'str', 

365 strand_indexes: 'list', 

366 sequence: 'str', 

367 output_directory: 'str'): 

368 """Function to execute Curves+ and Canals software to generate the output needed.""" 

369 

370 # Purge residual files from previous runs 

371 possible_residual_filenames = glob.glob(output_directory+'/*.cda') \ 

372 + glob.glob(output_directory+'/*.lis') \ 

373 + glob.glob(output_directory+'/*.ser') 

374 for filename in possible_residual_filenames: 

375 os.remove(filename) 

376 

377 # Indicate all the instructions with the desired commands, keep in mind that there could be more commands to include and obtain other results and files 

378 instructions = [ 

379 "Cur+ <<!", 

380 " &inp", 

381 f"file={trajectory_filepath}", 

382 f"ftop={topology_filepath}", 

383 f"lis={output_directory}/test", 

384 f"lib={standard_prefix}", 

385 " &end", 

386 "2 1 -1 0 0", 

387 f"{strand_indexes[0][0]}:{strand_indexes[0][1]}", 

388 f"{strand_indexes[1][0]}:{strand_indexes[1][1]}", 

389 "!" 

390 ] 

391 instructions = ["\n".join(instructions)] 

392 cmd = " ".join(instructions) 

393 print(' Running curves') 

394 # Execute the software using the instructions written previously 

395 process = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True,executable=os.getenv('SHELL', '/bin/sh')) 

396 

397 out, err = process.communicate() 

398 process.wait() 

399 out.decode("utf-8") 

400 

401 # Enter inside helicalparameters folder to execute Canals software as it needs the Curves+ output as input 

402 os.chdir(output_directory) 

403 

404 # If output has not been generated then warn the user 

405 if not os.path.exists('test.cda'): 

406 raise SystemExit('Something went wrong with Curves+ software') 

407 # Change the file name as Canals just take the name of the .cda file to differentiate between .lis and .cda 

408 move("test.cda","cinput.cda") 

409 # We have set up level1 and level2 to 0 as if lev1=lev2=0, lev1 is set to 1 and lev2 is set to the length of the oligmer 

410 level1 = 0 

411 level2 = 0 

412 # We just want to compute series files, but if you want more output just use the link written before  

413 # And include them in the instructions similarly to series input 

414 

415 series = ".t." 

416 instructions2 = [ 

417 "Canal <<! ", 

418 "&inp", 

419 " lis=canal_output,", 

420 f" lev1={level1},lev2={level2},", 

421 f" series={series},", 

422 "&end", 

423 f"cinput {sequence}", 

424 "!" 

425 ] 

426 instructions2 = ["\n".join(instructions2)] 

427 cmd2 = " ".join(instructions2) 

428 #logs = subprocess.run(instructions,stderr=subprocess.PIPE).stderr.decode() 

429 print(' Running canals') 

430 process = subprocess.Popen(cmd2,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True,executable=os.getenv('SHELL', '/bin/sh')) 

431 

432 out, err = process.communicate() 

433 process.wait() 

434 out.decode("utf-8") 

435 

436 # If output has not been generated then warn the user 

437 if not os.path.exists('canal_output_phaseC.ser'): 

438 raise SystemExit('Something went wrong with Canals software') 

439 

440def send_files(sequence: str, frames_limit: int, output_directory: str): 

441 files_averages = [] 

442 files_backbones = [] 

443 files_allbackbones = [] 

444 

445 # Iterate over all files in the directory 

446 for file in os.listdir(): 

447 if not file.endswith(".ser"): # Check that we are going to analyze the correct files, that have .ser extension 

448 continue 

449 # data = [] 

450 # with open(file, 'r') as ser_file: 

451 # # Skip the first line since it is only the headers 

452 # n_lines = 0 

453 # for line in ser_file: 

454 # # Skip the first line since it is only the row number 

455 # numbers = [ s for s in line.strip().split()[1:] ] 

456 # data += numbers 

457 # n_lines += 1 

458 # # if len(data) > 1000: 

459 # # break 

460 # # Save the file with the same name but with .bin extension and mdf prefix 

461 # file_name = 'mdf.' + file 

462 # file_path = os.path.join(os.getcwd(), file_name.replace('.ser', '.bin')) 

463 # # Store data in a binary format 

464 # store_binary_data( 

465 # data=data, 

466 # byte_size=4, # 32 bits 

467 # filepath=file_path 

468 # ) 

469 # # We need the meta file to store the information about the binary file (with the same name and .meta.json extension) 

470 # name_meta_file = file_path + '.meta.json' 

471 # meta_data = { 

472 # 'x': { 

473 # 'name': 'bases', 

474 # 'length': len(numbers) # Number of columns. It is the last value of the bucle for line in ser_file (maybe it causes problems) 

475 # }, 

476 # 'y': { 

477 # 'name': 'frames', 

478 # 'length': n_lines # Total number of lines in the file os the number of frames 

479 # }, 

480 # 'bitsize': 32, # 32 bits 

481 # } 

482 # save_json(meta_data, name_meta_file) 

483 

484 

485 # THIS PART OF THE CODE IS TO ANALYZE THE FILES AND COMPUTE THE AVERAGES, STANDARD DEVIATIONS AND TIME SERIES 

486 # BUT IT SHOULD BE REMOVED 

487 words = file.split('.') 

488 helpar = words[0].split('_')[-1].lower() 

489 # Check to which block the file came from 

490 if helpar in ["shear","stagger","stretch","buckle","opening","propel","inclin","tip","xdisp","ydisp", 

491 "majw","majd","minw","mind","rise","shift","slide","roll","tilt","twist"]: 

492 

493 files_averages.append(file) 

494 

495 elif helpar in ["alphac","alphaw","betac","betaw","gammac","gammaw","phasec","phasew","epsilc","epsilw","zetac","zetaw"]: 

496 

497 files_backbones.append(file) 

498 

499 if helpar in ['alphac', 'alphaw', 'betac', 'betaw', 'gammac', 'gammaw', 'deltac', 'deltaw', 'epsilc', 'epsilw', 

500 'zetac', 'zetaw', 'chic', 'chiw', 'phasec', 'phasew']: 

501 

502 files_allbackbones.append(file) 

503 

504 

505 

506 information_dictionary = {'avg_res':{'backbone':{},'grooves':{},'axisbp':{},'intrabp':{},'interbp':{}, 'stiffness':{}}} 

507 

508 information_dictionary0 = get_stiffness(sequence,files_averages,information_dictionary,frames_limit) 

509 # 'ts':{'backbone':{},'grooves':{},'axisbp':{},'intrabp':{},'interbp':{}} 

510 # Before there was another field called 'ts' or time series but it was removed because now it is included in the .ser and .bin files 

511 

512 # Work with files that correspond to the different blocks of Helical Parameters and perform the different computations to each block  

513 

514 # Call function to distribute the different files to compute their averages 

515 information_dictionary1 = flow_files_average(files_averages,information_dictionary0,sequence) 

516 

517 # Call function to distribute the files related to backbone torsions and perform different calculcations to each 

518 information_dictionary2 = flow_files_backbone(files_backbones,information_dictionary1) 

519 

520 # Call function to distribut all files and compute Time Series in all of them 

521 #info_dictionary = flow_files_timeseries(files_averages,files_allbackbones,information_dictionary2,frames_limit)  

522 return information_dictionary2 # Return the dictionary to convert it to a json 

523 

524# Function to compute the stiffness  

525def get_stiffness(sequence,files,info_dict,frames_limit): 

526 # Create and object (as NASSA does) to store the sequence and the corresponding .ser info so then we can calculate the stiffness 

527 extracted = {} 

528 # First parse the sequence to obtain the NucleicAcid object as NASSA wf does 

529 seq = [] 

530 seq.append(load_sequence2(sequence, unit_len=6)) 

531 extracted['sequences'] = seq 

532 # Set the hexamer coordinates 

533 # AGUS: si son pentámeros habría que añadir las coordenadas: shear stagger stretch chic chiw buckle opening propel 

534 cordinate_list = ['shift','slide','rise','roll','tilt','twist'] 

535 for coord in cordinate_list: 

536 crd_data = [] 

537 for ser_file in files: 

538 if ser_file.split('_')[-1].split('.')[0].lower() == coord: 

539 ser = load_serfile( 

540 ser_file) 

541 crd_data.append(ser) 

542 extracted[coord.lower()] = crd_data 

543 print(f"loaded {len(crd_data)} files for coordinate <{coord}>") 

544 results = transform(extracted) 

545 # Add the results stiffness to the dictionary 

546 info_dict['avg_res']['stiffness'] = results 

547 return info_dict 

548 

549# AGUS: estas tres funciones siguientes son una copia de NASSA pero modificada para este caso ya que NASSA  

550# AGUS: funciona para todas las secuencias o MD al mismo tiempo y el WF para cada proyecto/MD 

551# AGUS: además, ahora se define por defecto el nombre de la unidad como hexámero 

552def transform(data, unit_name='hexamer'): 

553 sequences = data.pop('sequences') 

554 results = {"stiffness": [], "covariances": {}, "constants": {}} 

555 for traj, seq in enumerate(sequences): 

556 traj_series = {coord.lower(): data[coord][traj] 

557 for coord in data.keys()} 

558 traj_results = get_stiffness_seq( 

559 seq, 

560 traj_series) 

561 results["stiffness"].append(traj_results["stiffness"]) 

562 covariances_serializable = { 

563 key: df.to_dict(orient="split") # 'index', 'columns', 'data' 

564 for key, df in traj_results["covariances"].items() 

565 } 

566 results["covariances"].update(covariances_serializable) 

567 constants_serializable = { 

568 key: df.to_dict(orient="split") # 'index', 'columns', 'data' 

569 for key, df in traj_results["constants"].items() 

570 } 

571 results["constants"].update(constants_serializable) 

572 

573 # AGUS: aquí se hace un cambio en la estructura del diccionario de stiffness para que sea más fácil de manejar en front end 

574 stiffness_by_coord = {} 

575 for item in results["stiffness"]: 

576 for key, value in item.items(): 

577 if key == "hexamer": 

578 continue 

579 if key not in stiffness_by_coord: 

580 stiffness_by_coord[key] = [] 

581 stiffness_by_coord[key].append(value) 

582 results["stiffness"] = stiffness_by_coord 

583 for key, value in results["stiffness"].items(): 

584 # Si es una lista con Series adentro, concatenamos y convertimos a list 

585 if isinstance(value, list) and all(hasattr(v, "tolist") for v in value): 

586 combined = pd.concat(value) 

587 results["stiffness"][key] = combined.tolist() 

588 # Si es una sola Series 

589 elif hasattr(value, "tolist"): 

590 results["stiffness"][key] = value.tolist() 

591 return results 

592 

593def get_stiffness_seq( 

594 sequence, 

595 series_dict): 

596 # get stiffness table for a given trajectory 

597 coordinates = list(series_dict.keys()) 

598 results = {"stiffness": {}, "covariances": {}, "constants": {}} 

599 diagonals = {} 

600 start = sequence.flanksize # + 2 

601 end = sequence.size - (sequence.baselen + sequence.flanksize - 1) # +2 

602 print(f"start: {start}, end: {end}") 

603 for i in range(start, end): 

604 hexamer = sequence.get_subunit(i) 

605 #ic_hexamer = sequence.inverse_complement(hexamer) 

606 cols_dict = {coord: series_dict[coord][i+1] 

607 for coord in series_dict.keys()} 

608 #print(f"cols_dict: {cols_dict}") 

609 stiffness_diag, cte, cov_df = get_subunit_stiffness( 

610 cols_dict, 

611 coordinates) 

612 diagonals[hexamer] = np.append( 

613 stiffness_diag, 

614 [np.product(stiffness_diag), np.sum(stiffness_diag)]) 

615 # diagonals[ic_hexamer] = np.append( 

616 # stiffness_diag, 

617 # [np.product(stiffness_diag), np.sum(stiffness_diag)]) 

618 results["covariances"][hexamer] = cov_df 

619 #results["covariances"][ic_hexamer] = cov_df 

620 results["constants"][hexamer] = cte 

621 #results["constants"][ic_hexamer] = cte 

622 # build stiffness table 

623 columns = [sequence.unit_name] + coordinates + ["product", "sum"] 

624 results["stiffness"] = pd.DataFrame.from_dict( 

625 diagonals, 

626 orient="index").reset_index() 

627 results["stiffness"].columns = columns 

628 return results 

629 

630def get_subunit_stiffness( 

631 cols_dict, 

632 coordinates, 

633 scaling=[1, 1, 1, 10.6, 10.6, 10.6], 

634 KT=0.592186827): 

635 import numpy.ma as ma 

636 def circ_avg(xarr, degrees=True): 

637 n = len(xarr) 

638 if degrees: 

639 # convert to radians 

640 xarr = xarr * np.pi / 180 

641 av = np.arctan2( 

642 (np.sum(np.sin(xarr)))/n, 

643 (np.sum(np.cos(xarr)))/n) * 180 / np.pi 

644 return av 

645 # AGUS: insisto, está adaptado solamente para hexámeros 

646 # if (self.unit_len % 2) == 0: 

647 SH_av = cols_dict["shift"].mean() 

648 SL_av = cols_dict["slide"].mean() 

649 RS_av = cols_dict["rise"].mean() 

650 TL_av = circ_avg(cols_dict["tilt"]) 

651 RL_av = circ_avg(cols_dict["roll"]) 

652 TW_av = circ_avg(cols_dict["twist"]) 

653 # elif (self.unit_len % 2) == 1: 

654 # SH_av = cols_dict["shear"].mean() 

655 # SL_av = cols_dict["stretch"].mean() 

656 # RS_av = cols_dict["stagger"].mean() 

657 # CW_av = cols_dict["chiw"].mean() 

658 # CC_av = cols_dict["chic"].mean() 

659 # TL_av = self.circ_avg(cols_dict["buckle"]) 

660 # RL_av = self.circ_avg(cols_dict["propel"]) 

661 # TW_av = self.circ_avg(cols_dict["opening"]) 

662 cols_arr = [cols_dict[coord] for coord in coordinates] 

663 cols_arr = np.array(cols_arr).T 

664 

665 cv = ma.cov(ma.masked_invalid(cols_arr), rowvar=False) 

666 cv.filled(np.nan) 

667 

668 cov_df = pd.DataFrame(cv, columns=coordinates, index=coordinates) 

669 stiff = np.linalg.inv(cv) * KT 

670 #print(stiff) 

671 # Added two new variables: ChiC and ChiW -> 8 (for PENTAMERS) => in NASSA (here only for hexamers) 

672 if (6 % 2) == 0: 

673 last_row = [SH_av, SL_av, RS_av, TL_av, RL_av, TW_av] #, CW_av, CC_av] 

674 stiff = np.append(stiff, last_row).reshape(7, 6) 

675 # elif (self.unit_len % 2) == 1: 

676 # last_row = [SH_av, SL_av, RS_av, TL_av, RL_av, TW_av, CW_av, CC_av] 

677 # print(last_row) 

678 # stiff = np.append(stiff, last_row).reshape(9, 8) 

679 # scaling=[1, 1, 1, 10.6, 10.6, 10.6, 1, 1] 

680 

681 stiff = stiff.round(6) 

682 stiff_diag = np.diagonal(stiff) * np.array(scaling) 

683 

684 cte = pd.DataFrame(stiff) 

685 cte.columns = coordinates 

686 cte.index = coordinates + ["avg"] 

687 return stiff_diag, cte, cov_df 

688 

689# Function to check whether the file is from one block or another regarding Helical parameters, the value of baselen could change 

690def checking(helpar_name): 

691 # get base length and unit from helical parameter name 

692 if helpar_name.lower() in hp_basepairs: 

693 baselen = 1 

694 elif helpar_name.lower() in hp_singlebases: 

695 baselen = 0 

696 return baselen 

697 

698# Function to check whether the file is from one block or another regarding Helical parameters, the value of baselen could change 

699def checking2(helpar_name): 

700 # get base length and unit from helical parameter name 

701 if helpar_name.lower() in hp_singlebases: 

702 baselen = 0 

703 else: 

704 baselen = 1 

705 if helpar_name.lower() in hp_angular: 

706 hp_unit = "Degrees" 

707 else: 

708 hp_unit = "Angstroms" 

709 return baselen,hp_unit 

710 

711# Function to compute the inverse complement DNA or RNA sequence it is comented becuase it is not used but in case it must be used it is here 

712''' 

713# Compute inverse complement DNA or RNA sequence  

714def reverse_sequence(sequence,DNA=True): 

715 if DNA: # If DNA flag is tru we want to compute the inverse using T instead of U 

716 A_base = "T" 

717 else: # Now it is RNA so we want to convert T to U 

718 A_base = "U" 

719 inverse = {"A":A_base,"G":"C","C":"G",A_base:"A"} # Dictionary to convert easily the sequence  

720 inv_seq = "" 

721 for i in sequence[::-1]: # Traverse the sequence from the end to the beginning 

722 inv_seq += inverse[i] # Obtain the complementary base  

723 return inv_seq # Return the inverse complement 

724''' 

725 

726# Function to read a given .ser file and transform it into pandas dataframe 

727def read_series(input_serfile, usecols=None): 

728 """Read .ser file""" 

729 extra_kwargs = dict( 

730 header=None, 

731 sep='\\s+', 

732 index_col=0) 

733 ser_data = pd.read_csv(input_serfile, **extra_kwargs) 

734 if usecols is not None: 

735 if 0 in usecols: 

736 usecols.pop(usecols.index(0)) 

737 ser_data = ser_data[[i+1 for i in usecols]] 

738 return ser_data 

739 

740 

741# AVERAGES FUNCTION TO COMPUTE THE AVERAGE OF THE FOLLOWING BLOCKS REGARDING THE HELICAL PARAMETERS 

742# GROOVES, INTRA BASE PAIR, INTER BASE PAIR, AXIS BASE PAIR 

743 

744# FUNCTION TO CONTROL FLOW OF FILES THAT ENTERS THE FUNCTION TO BE ANALYZED IN ORDER TO COMPUTE AVERAGE (MEAN AND STANDARD DEVIATION) 

745def flow_files_average(files,info_dict,sequence): 

746 for file in files: # Iterate over all files that must be analyzed 

747 word = file.split('.') 

748 helpword = word[0].split('_')[-1].lower() 

749 # Obtain baselen value depending on the file 

750 baselen = checking(helpword) 

751 # Convert the files into Pandas dataframe to make the computations and data manipulation easily 

752 dataframe = read_series(file) 

753 df1,df2 = average_std(dataframe,baselen,sequence) 

754 df1i,df2i = average_std_intra(dataframe,baselen,sequence) 

755 if helpword in ["roll","tilt","twist","rise","shift","slide"]: 

756 # INTER BASEPAIR BLOCK 

757 info_dict['avg_res']['interbp'][helpword] = {'avg':{},'std':{}} 

758 # we want to store all the information regarding the averages  

759 info_dict['avg_res']['interbp'][helpword]['avg'] = df1.T.values.tolist() 

760 

761 # we want to store all the information regarding the standard deviations 

762 info_dict['avg_res']['interbp'][helpword]['std'] = df2.T.values.tolist() 

763 

764 elif helpword in ["shear","stagger","stretch","buckle","opening","propel"]: 

765 # INTRA BASEPAIR BLOCK 

766 info_dict['avg_res']['intrabp'][helpword] = {'avg':{},'std':{}} 

767 # we want to store all the information regarding the averages  

768 info_dict['avg_res']['intrabp'][helpword]['avg'] = df1i.T.values.tolist() 

769 

770 # we want to store all the information regarding the standard deviations 

771 info_dict['avg_res']['intrabp'][helpword]['std'] = df2i.T.values.tolist() 

772 

773 elif helpword in ["xdisp","ydisp","inclin","tip"]: 

774 # AXIS BASEPAIR BLOCK  

775 info_dict['avg_res']['axisbp'][helpword] = {'avg':{},'std':{}} 

776 

777 # we want to store all the information regarding the averages 

778 info_dict['avg_res']['axisbp'][helpword]['avg'] = df1i.T.values.tolist() 

779 

780 # we want to store all the information regarding the standard deviations 

781 info_dict['avg_res']['axisbp'][helpword]['std'] = df2i.T.values.tolist() 

782 

783 else: 

784 # GROOVES BLOCK 

785 info_dict['avg_res']['grooves'][helpword] = {'avg':{},'std':{}} 

786 

787 # we want to store all the information regarding the averages  

788 info_dict['avg_res']['grooves'][helpword]['avg'] = df1.T.values.tolist() 

789 

790 # we want to store all the information regarding the standard deviations 

791 info_dict['avg_res']['grooves'][helpword]['std'] = df2.T.values.tolist() 

792 

793 return info_dict # Return dictionary with all information computed (averages and standard deviation) 

794 

795# Compute average and standard deviation 

796def average_std(dataf,baselen,sequence): 

797 # For hexamers, baselen has to be 2 

798 baselen = 2 

799 # discard first and last base(pairs) from sequence 

800 dataf = dataf[dataf.columns[2:17]] 

801 # sequence = sequence[1:] 

802 # print("sequence: ",sequence) 

803 xlabels = [ 

804 f"{sequence[i:i+1+baselen]}" 

805 for i in range(len(dataf.columns))] # - baselen 

806 means = dataf.mean(axis=0).iloc[:len(xlabels)] 

807 stds = dataf.std(axis=0).iloc[:len(xlabels)] 

808 return means,stds 

809 

810def average_std_intra(dataf,baselen,sequence): 

811 # For hexamers, baselen has to be 2 

812 baselen = 2 

813 # discard first and last base(pairs) from sequence 

814 dataf = dataf[dataf.columns[2:18]] 

815 # sequence = sequence[1:] 

816 # print("sequence: ",sequence) 

817 xlabels = [ 

818 f"{sequence[i:i+1+baselen]}" 

819 for i in range(len(dataf.columns))] # - baselen 

820 means = dataf.mean(axis=0).iloc[:len(xlabels)] 

821 stds = dataf.std(axis=0).iloc[:len(xlabels)] 

822 return means,stds 

823 

824# OTHER FUNCTIONS THAT MUST PERFORM OTHER CALCULATIONS REGARDING TO THE LAST BLOCK OF HELICAL PARAMETERS (BACKBONE TORSIONS) 

825# THERE ARE 3 DIFFERENT FUNCTIONS EACH ONE COMPUTING DIFFERENT ASPECTS OF THE FLEXIBILITY IN THE BACKBONE  

826# SUGAR PUCKERING, CANONICAL ALPHA/GAMMA AND BI/BII POPULATIONS 

827 

828# FUNCTION TO CONTROL  

829def flow_files_backbone(files,info_dict): 

830 # Store files related to Puckering computations 

831 puckering_files = [] 

832 # Store files related to Canonical Alpha Gamma computations 

833 canonicalg_files = [] 

834 # Store files related to BI and BII populations 

835 bipopulations_files = [] 

836 # Iterate over all files that must be analyzed 

837 for file in files: 

838 word = file.split('.') 

839 helpword = word[0].split('_')[-1].lower() 

840 # Sugar Puckering 

841 if helpword in ['phasec','phasew']: 

842 puckering_files.append(file) 

843 # Canonical Alpha/Gamma 

844 elif helpword in ['alphac','alphaw','gammac','gammaw']: 

845 canonicalg_files.append(file) 

846 # BI/BII Population 

847 elif helpword in ['epsilc','epsilw','zetac','zetaw']: 

848 bipopulations_files.append(file) 

849 info_dict['avg_res']['backbone'] = {'puckering':{'north':{},'east':{},'west':{},'south':{}},'bi':{},'bii':{},'canonical_alphagamma':{}} 

850 # Call function to perform computations regarding Sugar Puckering 

851 npop, epop, wpop, spop = sugar_puckering(puckering_files) 

852 info_dict['avg_res']['backbone']['puckering']['north'] = npop.T.values.tolist() 

853 info_dict['avg_res']['backbone']['puckering']['east'] = epop.T.values.tolist() 

854 info_dict['avg_res']['backbone']['puckering']['west'] = wpop.T.values.tolist() 

855 info_dict['avg_res']['backbone']['puckering']['south'] = spop.T.values.tolist() 

856 # Call function to perform computations regarding Canonical Alpha Gamma 

857 canon = canonical_alphagamma(canonicalg_files) 

858 info_dict['avg_res']['backbone']['canonical_alphagamma'] = canon.T.values.tolist() 

859 # Call function to perform computations regarding BI and BII populations 

860 b1,b2 = bi_populations(bipopulations_files) 

861 info_dict['avg_res']['backbone']['bi'] = b1.T.values.tolist() 

862 info_dict['avg_res']['backbone']['bii'] = b2.T.values.tolist() 

863 return info_dict 

864 

865######## SUGAR PUCKERING  

866 

867# READ FILES AND CORRECT ANGLES  

868def sugar_puckering(pcukfiles): 

869 for file in pcukfiles: # Iterate over all files  

870 if file[-5:] == "C.ser": 

871 phasec = file 

872 else: 

873 phasew = file 

874 

875 # Convert the files into Pandas dataframe to make the computations and data manipulation easily 

876 phasec = read_series(phasec) 

877 # Convert the files into Pandas dataframe to make the computations and data manipulation easily 

878 phasew = read_series(phasew) 

879 # Fix angles that are negative 

880 phasec = fix_angles(phasec) 

881 # Fix angles that are negative 

882 phasew = fix_angles(phasew) 

883 

884 return cpuck(phasec,phasew) 

885 

886# Correct angles values as we dont want negative angles measurements 

887def fix_angles(dataset): 

888 values = np.where(dataset < 0, dataset + 360, dataset) 

889 dataset = pd.DataFrame(values) 

890 return dataset 

891 

892# Compute sugar puckering, using 2 files phaseC and phaseW 

893def cpuck(phaseC,phaseW): 

894 separator_df = pd.DataFrame({"-": np.nan}, index=range(1, len(phaseC))) 

895 phase = pd.concat([ 

896 phaseW, 

897 separator_df, 

898 phaseC[phaseC.columns[::-1]]], 

899 axis=1) 

900 

901 Npop = np.logical_or(phase > 315, phase < 45).mean() * 100 

902 Epop = np.logical_and(phase > 45, phase < 135).mean() * 100 

903 Wpop = np.logical_and(phase > 225, phase < 315).mean() * 100 

904 Spop = np.logical_and(phase > 135, phase < 225).mean() * 100 

905 

906 return Npop, Epop, Wpop, Spop 

907 

908######## CANONICAL ALPHA/GAMMA 

909 

910# READ FILES AND CORRECT ANGLES REGARDING CANICAL ALPHA GAMMA  

911def canonical_alphagamma(canonfiles): 

912 for file in canonfiles: 

913 if file[-7:] == "haC.ser": 

914 alphac = file 

915 elif file[-7:] == "haW.ser": 

916 alphaw = file 

917 elif file[-7:] == "maC.ser": 

918 gammac = file 

919 elif file[-7:] == "maW.ser": 

920 gammaw = file 

921 

922 # Convert the files into Pandas dataframe to make the computations and data manipulation easily 

923 alphac = read_series(alphac) 

924 # Convert the files into Pandas dataframe to make the computations and data manipulation easily 

925 alphaw = read_series(alphaw) 

926 # Convert the files into Pandas dataframe to make the computations and data manipulation easily 

927 gammac = read_series(gammac) 

928 # Convert the files into Pandas dataframe to make the computations and data manipulation easily 

929 gammaw = read_series(gammaw) 

930 # Fix angles that are negative and over 360 degrees 

931 alphac = fix_angles2(alphac) 

932 # Fix angles that are negative and over 360 degrees 

933 alphaw = fix_angles2(alphaw) 

934 # Fix angles that are negative and over 360 degrees 

935 gammac = fix_angles2(gammac) 

936 # Fix angles that are negative and over 360 degrees 

937 gammaw = fix_angles2(gammaw) 

938 

939 return check_alpgamm(alphac,gammac,alphaw,gammaw) 

940 

941# Correct angles values as we dont want negative angles measurements and more than 360 degrees 

942def fix_angles2(dataset): 

943 values = np.where(dataset < 0, dataset + 360, dataset) 

944 values = np.where(values > 360, values - 360, values) 

945 dataset = pd.DataFrame(values) 

946 return dataset 

947 

948# Compute canonical populations using 4 different files alphaC, gammaC, alphaW and gammaW 

949def check_alpgamm(alphaC,gammaC,alphaW,gammaW): 

950 separator_df = pd.DataFrame({"-": np.nan}, index=range(len(gammaW))) 

951 gamma = pd.concat([ 

952 gammaW, 

953 separator_df, 

954 gammaC[gammaC.columns[::-1]]], 

955 axis=1) 

956 alpha = pd.concat([ 

957 alphaW, 

958 separator_df, 

959 alphaC[alphaC.columns[::-1]]], 

960 axis=1) 

961 alpha_filter = np.logical_and(alpha > 240, alpha < 360) 

962 gamma_filter = np.logical_and(gamma > 0, gamma < 120) 

963 canonical_alpha_gamma = np.logical_and( 

964 alpha_filter, gamma_filter).mean() * 100 

965 

966 return canonical_alpha_gamma 

967 

968# BI / BII POPULATIONS 

969 

970def bi_populations(bfiles): 

971 for file in bfiles: 

972 if file[-7:] == "ilC.ser": 

973 epsilc = file 

974 elif file[-7:] == "ilW.ser": 

975 epsilw = file 

976 elif file[-7:] == "taC.ser": 

977 zetac = file 

978 elif file[-7:] == "taW.ser": 

979 zetaw = file 

980 

981 # Convert the files into Pandas dataframe to make the computations and data manipulation easily 

982 epsilc = read_series(epsilc) 

983 # Convert the files into Pandas dataframe to make the computations and data manipulation easily 

984 epsilw = read_series(epsilw) 

985 # Convert the files into Pandas dataframe to make the computations and data manipulation easily 

986 zetac = read_series(zetac) 

987 # Convert the files into Pandas dataframe to make the computations and data manipulation easily 

988 zetaw = read_series(zetaw) 

989 # Fix angles that are negative and over 360 degrees 

990 epsilc = fix_angles2(epsilc) 

991 # Fix angles that are negative and over 360 degrees 

992 epsilw = fix_angles2(epsilw) 

993 # Fix angles that are negative and over 360 degrees 

994 zetac = fix_angles2(zetac) 

995 # Fix angles that are negative and over 360 degrees 

996 zetaw = fix_angles2(zetaw) 

997 # Compute differences between Epsilon and Zeta values 

998 diff_epsil_zeta = angles_diff_ze(epsilc,zetac,epsilw,zetaw) 

999 BI,BII = bi_pop(diff_epsil_zeta) 

1000 return BI,BII 

1001 

1002# Compute difference between epsil and zeta 

1003def angles_diff_ze( epsilC, zetaC, epsilW, zetaW): 

1004 # concatenate zeta and epsil arrays 

1005 separator_df = pd.DataFrame({"-": np.nan}, index=range(len(zetaW))) 

1006 zeta = pd.concat([ 

1007 zetaW, 

1008 separator_df, 

1009 zetaC[zetaC.columns[::-1]]], 

1010 axis=1) 

1011 epsil = pd.concat([ 

1012 epsilW, 

1013 separator_df, 

1014 epsilC[epsilC.columns[::-1]]], 

1015 axis=1) 

1016 

1017 # difference between epsilon and zeta coordinates 

1018 diff_epsil_zeta = epsil - zeta 

1019 return diff_epsil_zeta 

1020 

1021# Compute BI and BII populations 

1022def bi_pop(diff_epsil_zeta): 

1023 BI = (diff_epsil_zeta < 0).sum(axis=0) * 100 / len(diff_epsil_zeta) 

1024 BII = 100 - BI 

1025 return BI, BII 

1026 

1027######## TIME SERIES 

1028 

1029# Function to distribute all .ser files in order to compute later Time Series 

1030def flow_files_timeseries(files_average,files_backbone,info_dict,frames_limit): 

1031 for file in files_average: # Iterate over all files that must be analyzed 

1032 word = file.split('.') 

1033 helpword = word[0].split('_')[-1].lower() 

1034 df = time_series(file,helpword,frames_limit) 

1035 

1036 # Store all computations from files that are related to the block Inter Basepair  

1037 if helpword in ["roll","tilt","twist","rise","shift","slide"]: 

1038 info_dict['ts']['interbp'][helpword] = {} 

1039 # Store all information  

1040 info_dict['ts']['interbp'][helpword] = df.T.values.tolist() 

1041 

1042 # Store all computations from files that are related to the block Intra Basepair 

1043 elif helpword in ["shear","stagger","stretch","buckle","opening","propel"]: 

1044 info_dict['ts']['intrabp'][helpword] = {} 

1045 # Store all information  

1046 info_dict['ts']['intrabp'][helpword] = df.T.values.tolist() 

1047 

1048 # Store all computations from files that are related to the block Axis Basepair 

1049 elif helpword in ["xdisp","ydisp","inclin","tip"]: 

1050 info_dict['ts']['axisbp'][helpword] = {} 

1051 # Store all information 

1052 info_dict['ts']['axisbp'][helpword] = df.T.values.tolist() 

1053 

1054 

1055 else: 

1056 # Store all computations from files that are related to the block Grooves 

1057 info_dict['ts']['grooves'][helpword] = {} 

1058 # Store all information  

1059 info_dict['ts']['grooves'][helpword] = df.T.values.tolist() 

1060 

1061 for file2 in files_backbone: 

1062 word = file2.split('.') 

1063 helpword = word[0].split('_')[-1].lower() 

1064 df1 = time_series(file2,helpword,frames_limit) 

1065 # Store all computations from files that are related to the block Backbone Torsions 

1066 info_dict['ts']['backbone'][helpword] = {} 

1067 # Store all information 

1068 info_dict['ts']['backbone'][helpword] = df1.T.values.tolist() 

1069 

1070 

1071 return info_dict 

1072 

1073# Function to compute Time Series for each files that flow_files_timeseries() passes 

1074def time_series(file,word,reduced_trajectory_frames_limit): 

1075 sequence = "GCTTTTAGCGGTTGAACGGC" 

1076 baselen,hp_unit = checking2(word) 

1077 ser_data = read_series(file) 

1078 ser_data = ser_data[ser_data.columns[1:-1]] 

1079 sequence = sequence[1:] 

1080 

1081 ''' 

1082 subunits = [ 

1083 f"{i+1}_{sequence[i:i+1+baselen]}" 

1084 for i in range(len(ser_data.columns))] 

1085 ser_data.columns = subunits 

1086 ''' 

1087 snapshots = len(ser_data.index) 

1088 

1089 step = math.ceil(snapshots)# / reduced_trajectory_frames_limit) 

1090 

1091 ser_data = ser_data[0:len(ser_data.index):step] 

1092 return ser_data