Coverage for model_workflow/analyses/helical_parameters.py: 89%
472 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
1import numpy as np
2import os
3from os.path import exists
4import pandas as pd
5import math
6import subprocess
7from shutil import move
8import glob
9from struct import pack
10import re
11from warnings import warn
12from model_workflow.utils.auxiliar import save_json, store_binary_data
13from model_workflow.utils.constants import OUTPUT_HELICAL_PARAMETERS_FILENAME
14from model_workflow.utils.type_hints import *
16from model_workflow.tools.nassa_loaders import load_sequence2
17from model_workflow.tools.nassa_loaders import load_serfile
19conda_prefix = os.environ['CONDA_PREFIX']
20# If this path does not exist then it means curves is not installed
21curves_path = conda_prefix + '/.curvesplus'
22# Note that this is not a file, but the prefix to 3 different files
23standard_prefix = curves_path + '/standard'
25# TAKE INTO ACCOUNT THAT EL CODE WAS DEVELOP AND IMPLEMENTED IN THIS WORKFLOW USING AS A TEMPLATE CODE USED IN IRBBARCELONA BIOBB
26# HERE THERE IS THE LINK RELATED TO THEIR WEBPAGE WITH MORE WORKFLOWS COMPUTING OTHER STUFF
27# https://mmb.irbbarcelona.org/biobb/workflows
29# HERE THERE IS THE LINK RELATED TO THE GITHUB WEBPAGE WITH THE PYTHON CODE
30# https://github.com/bioexcel/biobb_wf_dna_helparms
32# HERE THERE IS THE LINK RELATED TO THE INFORMATION OF HOW TO EXECUTE CURVES+ AND CANALS AND INTERPRET THEIR OUTPUTS AS WELL AS THE INPUT NEEDED FOR BOTH
33# http://gbio-pbil.ibcp.fr/tools/curves_plus/canal-user-guide.html
35# List with the different files names that correspond to single bases
36hp_singlebases = [
37 "shear",
38 "stagger",
39 "stretch",
40 "buckle",
41 "opening",
42 "propel",
43 "inclin",
44 "tip",
45 "xdisp",
46 "ydisp"
47]
49# List with the different files names that correspond to base pairs
50hp_basepairs = [
51 "majw",
52 "majd",
53 "minw",
54 "mind",
55 "rise",
56 "shift",
57 "slide",
58 "roll",
59 "tilt",
60 "twist"
61]
63# List with the different files names which we will use degrees units
64hp_angular = [
65 "roll",
66 "tilt",
67 "twist",
68 "buckle",
69 "opening",
70 "propel",
71 "inclin",
72 "tip"
73]
75# List with the different files names which we will use amstrongs units
76hp_translational = [
77 "shear",
78 "stagger",
79 "stretch",
80 "xdisp",
81 "ydisp",
82 "majw",
83 "majd",
84 "minw",
85 "mind",
86 "rise",
87 "shift",
88 "slide"
89]
91# List with the different files names corresponding to Backbone block of Helical Parameters
92hp_backbone = [
93 "alphac",
94 "alphaw",
95 "betac",
96 "betaw",
97 "gammac",
98 "gammaw",
99 "deltac",
100 "deltaw",
101 "chic",
102 "chiw",
103 "phasec",
104 "phasew",
105 "epsilc",
106 "epsilw",
107 "zetac",
108 "zetaw"
109]
111helical_parameters = hp_basepairs + hp_singlebases + hp_backbone
112baselen = 0
113hp_unit = ""
115def helical_parameters (
116 topology_file : 'File',
117 trajectory_file : 'File',
118 structure_file : 'File',
119 output_directory : str,
120 structure : 'Structure',
121 frames_limit : Optional[int] = None,
122 dna_selection : Optional[str] = None
123):
124 """Helical parameters analysis."""
125 # Set the main output filepath
126 output_analysis_filepath = f'{output_directory}/{OUTPUT_HELICAL_PARAMETERS_FILENAME}'
128 # Save the original path
129 actual_path = os.getcwd()
131 # Get a selection from both chains in the B-DNA
132 selection = None
133 # In case a selection was provided then use it
134 if dna_selection:
135 selection = structure.select(dna_selection, syntax='vmd')
136 # Otherwise, get all nucleic acids
137 else:
138 selection = structure.select_nucleic()
140 # If there is nothing to analyze then we return here
141 if not selection:
142 print(' There are no nucleic acids')
143 return
145 # Check curves is installed
146 if not exists(curves_path):
147 raise SystemExit(' Cannot find curves path. Is Curves+ installed?')
149 # Get the sequence from the selected chains
150 chain_indices = structure.get_selection_chain_indices(selection)
151 if len(chain_indices) != 2:
152 raise SystemExit('We have a different number of chains than 2. Is this canonical B-DNA? Make sure every strand has an independent chain')
153 sequences = []
154 residue_index_ranges = []
155 for c, chain_index in enumerate(chain_indices):
156 chain = structure.chains[chain_index] # Obtain Chain of nucleic acids
157 sequences.append(chain.get_sequence()) # Retrieve its sequence
158 first_residue_index = chain.residue_indices[0] +1
159 last_residue_index = chain.residue_indices[-1] +1
160 residue_index_range = (first_residue_index, last_residue_index) if c == 0 else (last_residue_index, first_residue_index)
161 residue_index_ranges.append(residue_index_range)
163 # Run the hydrogen bond analysis to generate .dat file
164 hydrogen_bonds(
165 topology_file,
166 trajectory_file,
167 output_analysis_filepath,
168 output_directory,
169 structure_file,
170 )
172 terminal_execution(
173 trajectory_file.path,
174 structure_file.path,
175 residue_index_ranges,
176 sequences[0],
177 output_directory
178 )
179 # Save in a dictionary all the computations done by the different functions called by send_files function
180 dictionary_information = send_files(sequences[0], frames_limit, output_directory)
181 # Set the path into the original directory outside the folder helicalparameters
182 os.chdir(actual_path)
183 # Convert the dictionary into a json file
184 # DANI: Estamos teniendo NaNs que no son soportados más adelante, hay que eliminarlos
185 save_json(dictionary_information, output_analysis_filepath)
187# Function to execute cpptraj and calculate .dat file (Hydrogen Bonds)
188def hydrogen_bonds(
189 topology_file : 'File',
190 trajectory_file : 'File',
191 output_analysis_filepath : str,
192 output_directory : str,
193 structure_file : 'File',
194):
195 # This analysis is done using cpptraj and its done by two steps
196 # 1. Crate the cpptraj.in file to generate the dry trajectory (cpptraj_dry.inpcrd)
197 # parm_path = f"/orozco/projects/ABCix/ProductionFiles"
198 # setup_path = f"/orozco/projects/ABCix/ProductionFiles/SETUP"
200 # # Config cpptraj
201 # cpptraj_in_file = f"{output_directory}/cpptraj.in"
202 # cpptraj_out_file = f"{output_directory}/cpptraj_dry.inpcrd"
204 # if not os.path.exists(cpptraj_out_file):
207 # # Exec cpptraj
208 # cpptraj_command = f"cpptraj {cpptraj_in_file}"
209 # res = subprocess.run(cpptraj_command, shell=True)
212 ################
213 # 2. From the dry.inpcrd file we will generate the .dat file
214 # Create the helical parameters folder
215 # If the folder already exists dont create it again
216 output_dat_file = os.path.join(output_directory, "mdf.nahbonds.dat")
218 print(f"Saving .dat file to: {output_dat_file}")
220 # Select reference structure
221 # AGUS: para el proyecto ABCix utilizamos los inpcrd de la simulación como referencia porque al utilizar nuestro pdb daba muchos errores
222 inpcrd_file = glob.glob(os.path.join(output_directory, "*.inpcrd"))
223 if len(inpcrd_file) == 0:
224 warn(f"No inpcrd file found in the helical_parameters folder '{output_directory}', executing cpptraj with structure PDB.")
225 structure_reference = structure_file.path
226 else:
227 structure_reference = inpcrd_file[0] # Select the first matching file
229 # Crear el contenido del input de cpptraj usando las variables
230 cpptraj_script = f"""
231 # Load the topology file
232 parm {topology_file.path}
234 # Load the initial reference structure
235 trajin {structure_reference}
237 # Load the trajectory
238 trajin {trajectory_file.path}
240 # Run the NA structure analysis
241 nastruct NA
243 # Run the command to process the trajectory
244 run
246 # Write out hydrogen bond information to a file
247 writedata {output_dat_file} NA[hb]
249 # Quit
250 quit
251 """
252 print("Executing cpptraj...")
253 # Execute cpptraj with the generated script
254 subprocess.run(["cpptraj"], input=cpptraj_script, text=True, check=True)
256 # Check if the output file was created
257 if not os.path.exists(output_dat_file):
258 raise SystemExit(f"Error: {output_dat_file} was not created. Check the cpptraj command and input files.")
260 # Renumber the dat file because the first line is repeated
261 # AGUS: con esto tuvimos muchos problemas en el proyecto de ABCix y al final optamos por eliminar la primera línea y renumerar todo
262 # AGUS: además, también sirvió para comprimir el archivo y leerlo mejor
263 def renumber_dat_file(file_path):
264 with open(file_path, 'r') as file:
265 lines = file.readlines()
267 counter = 1
268 first = True
270 renumbered_lines = []
271 for line in lines:
272 if line.strip().startswith('#'):
273 renumbered_lines.append(line)
274 elif first:
275 first = False
276 continue
277 else:
278 renumbered_lines.append(f"{counter} {' '.join(map(str, line.split()[1:]))}\n")
279 counter += 1
281 with open(file_path, 'w') as file:
282 file.writelines(renumbered_lines)
283 print("Renumbering the dat file...")
284 # Renumber the dat file
285 renumber_dat_file(output_dat_file)
287 # Set the function to compress the dat file to .bin file
288 def compress_dat_to_bin(output_dat_file):
289 data = []
290 with open(output_dat_file, 'r') as file:
291 # Identify which columns we must mine using the headers
292 headers = next(file).strip().split()
293 column_order = []
294 n_from = 1
295 n_to = 40
296 for h, header in enumerate(headers):
297 target = re.compile(f'{n_from}[ACGT]{n_to}[ACGT]')
298 if not target.match(header): continue
299 column_order.append(h)
300 n_from += 1
301 n_to -= 1
302 # Now iterate the rest of lines which include the actual values
303 # Initialize the number of frames to count all of them
304 n_frames = 0
305 for line in file:
306 values = line.strip().split()
307 for c in column_order: data.append(int(values[c]))
308 n_frames += 1
310 # Set the name of the binary dat file
311 bin_file_path = output_dat_file.replace('.dat', '.bin')
313 # In this case we store a hydrogen bond value every 2 bits
314 bit_size = 2
316 # Start writting the output file
317 with open(bin_file_path, 'wb') as file:
318 bit_count = 0
319 current_byte = ''
320 # Iterate over data list values
321 for value in data:
322 # Parse the value to binary and make sure the binary is as long as the bit size
323 bits = format(value, 'b').zfill(bit_size)
324 if len(bits) != bit_size:
325 raise ValueError(f'Value {value} cannot be stored in {bit_size} bits')
326 # Add bits one by one to the current byte to be written
327 for bit in bits:
328 current_byte += bit
329 bit_count += 1
330 # If the current byte is full then write it to the output file
331 if bit_count == 8:
332 #print(current_byte + ' -> ' + str(int(current_byte, 2)))
333 file.write(pack('<B', int(current_byte, 2)))
334 current_byte = ''
335 bit_count = 0
336 # If last byte is truncated then fill it with 0s and write it
337 if bit_count != 0:
338 last_byte = current_byte.ljust(8, '0')
339 file.write(pack('<B', int(last_byte, 2)))
341 # Set the name of the meta file and create the meta data corresponding to the binary dat file
342 meta_file_path = bin_file_path + '.meta.json'
343 meta_data = {
344 'x': {
345 'name': 'bases',
346 'length': 20
347 },
348 'y': {
349 'name': 'frames',
350 'length': n_frames
351 },
352 'bitsize': 2,
353 }
354 save_json(meta_data, meta_file_path)
356 # Compress the dat file to .bin file
357 compress_dat_to_bin(output_dat_file)
358 print("Dat file compressed to bin file, removing original dat file...")
359 # Remove the original dat file
360 #os.remove(output_dat_file)
362def terminal_execution(
363 trajectory_filepath: 'str',
364 topology_filepath: 'str',
365 strand_indexes: 'list',
366 sequence: 'str',
367 output_directory: 'str'):
368 """Function to execute Curves+ and Canals software to generate the output needed."""
370 # Purge residual files from previous runs
371 possible_residual_filenames = glob.glob(output_directory+'/*.cda') \
372 + glob.glob(output_directory+'/*.lis') \
373 + glob.glob(output_directory+'/*.ser')
374 for filename in possible_residual_filenames:
375 os.remove(filename)
377 # Indicate all the instructions with the desired commands, keep in mind that there could be more commands to include and obtain other results and files
378 instructions = [
379 "Cur+ <<!",
380 " &inp",
381 f"file={trajectory_filepath}",
382 f"ftop={topology_filepath}",
383 f"lis={output_directory}/test",
384 f"lib={standard_prefix}",
385 " &end",
386 "2 1 -1 0 0",
387 f"{strand_indexes[0][0]}:{strand_indexes[0][1]}",
388 f"{strand_indexes[1][0]}:{strand_indexes[1][1]}",
389 "!"
390 ]
391 instructions = ["\n".join(instructions)]
392 cmd = " ".join(instructions)
393 print(' Running curves')
394 # Execute the software using the instructions written previously
395 process = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True,executable=os.getenv('SHELL', '/bin/sh'))
397 out, err = process.communicate()
398 process.wait()
399 out.decode("utf-8")
401 # Enter inside helicalparameters folder to execute Canals software as it needs the Curves+ output as input
402 os.chdir(output_directory)
404 # If output has not been generated then warn the user
405 if not os.path.exists('test.cda'):
406 raise SystemExit('Something went wrong with Curves+ software')
407 # Change the file name as Canals just take the name of the .cda file to differentiate between .lis and .cda
408 move("test.cda","cinput.cda")
409 # We have set up level1 and level2 to 0 as if lev1=lev2=0, lev1 is set to 1 and lev2 is set to the length of the oligmer
410 level1 = 0
411 level2 = 0
412 # We just want to compute series files, but if you want more output just use the link written before
413 # And include them in the instructions similarly to series input
415 series = ".t."
416 instructions2 = [
417 "Canal <<! ",
418 "&inp",
419 " lis=canal_output,",
420 f" lev1={level1},lev2={level2},",
421 f" series={series},",
422 "&end",
423 f"cinput {sequence}",
424 "!"
425 ]
426 instructions2 = ["\n".join(instructions2)]
427 cmd2 = " ".join(instructions2)
428 #logs = subprocess.run(instructions,stderr=subprocess.PIPE).stderr.decode()
429 print(' Running canals')
430 process = subprocess.Popen(cmd2,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True,executable=os.getenv('SHELL', '/bin/sh'))
432 out, err = process.communicate()
433 process.wait()
434 out.decode("utf-8")
436 # If output has not been generated then warn the user
437 if not os.path.exists('canal_output_phaseC.ser'):
438 raise SystemExit('Something went wrong with Canals software')
440def send_files(sequence: str, frames_limit: int, output_directory: str):
441 files_averages = []
442 files_backbones = []
443 files_allbackbones = []
445 # Iterate over all files in the directory
446 for file in os.listdir():
447 if not file.endswith(".ser"): # Check that we are going to analyze the correct files, that have .ser extension
448 continue
449 # data = []
450 # with open(file, 'r') as ser_file:
451 # # Skip the first line since it is only the headers
452 # n_lines = 0
453 # for line in ser_file:
454 # # Skip the first line since it is only the row number
455 # numbers = [ s for s in line.strip().split()[1:] ]
456 # data += numbers
457 # n_lines += 1
458 # # if len(data) > 1000:
459 # # break
460 # # Save the file with the same name but with .bin extension and mdf prefix
461 # file_name = 'mdf.' + file
462 # file_path = os.path.join(os.getcwd(), file_name.replace('.ser', '.bin'))
463 # # Store data in a binary format
464 # store_binary_data(
465 # data=data,
466 # byte_size=4, # 32 bits
467 # filepath=file_path
468 # )
469 # # We need the meta file to store the information about the binary file (with the same name and .meta.json extension)
470 # name_meta_file = file_path + '.meta.json'
471 # meta_data = {
472 # 'x': {
473 # 'name': 'bases',
474 # 'length': len(numbers) # Number of columns. It is the last value of the bucle for line in ser_file (maybe it causes problems)
475 # },
476 # 'y': {
477 # 'name': 'frames',
478 # 'length': n_lines # Total number of lines in the file os the number of frames
479 # },
480 # 'bitsize': 32, # 32 bits
481 # }
482 # save_json(meta_data, name_meta_file)
485 # THIS PART OF THE CODE IS TO ANALYZE THE FILES AND COMPUTE THE AVERAGES, STANDARD DEVIATIONS AND TIME SERIES
486 # BUT IT SHOULD BE REMOVED
487 words = file.split('.')
488 helpar = words[0].split('_')[-1].lower()
489 # Check to which block the file came from
490 if helpar in ["shear","stagger","stretch","buckle","opening","propel","inclin","tip","xdisp","ydisp",
491 "majw","majd","minw","mind","rise","shift","slide","roll","tilt","twist"]:
493 files_averages.append(file)
495 elif helpar in ["alphac","alphaw","betac","betaw","gammac","gammaw","phasec","phasew","epsilc","epsilw","zetac","zetaw"]:
497 files_backbones.append(file)
499 if helpar in ['alphac', 'alphaw', 'betac', 'betaw', 'gammac', 'gammaw', 'deltac', 'deltaw', 'epsilc', 'epsilw',
500 'zetac', 'zetaw', 'chic', 'chiw', 'phasec', 'phasew']:
502 files_allbackbones.append(file)
506 information_dictionary = {'avg_res':{'backbone':{},'grooves':{},'axisbp':{},'intrabp':{},'interbp':{}, 'stiffness':{}}}
508 information_dictionary0 = get_stiffness(sequence,files_averages,information_dictionary,frames_limit)
509 # 'ts':{'backbone':{},'grooves':{},'axisbp':{},'intrabp':{},'interbp':{}}
510 # Before there was another field called 'ts' or time series but it was removed because now it is included in the .ser and .bin files
512 # Work with files that correspond to the different blocks of Helical Parameters and perform the different computations to each block
514 # Call function to distribute the different files to compute their averages
515 information_dictionary1 = flow_files_average(files_averages,information_dictionary0,sequence)
517 # Call function to distribute the files related to backbone torsions and perform different calculcations to each
518 information_dictionary2 = flow_files_backbone(files_backbones,information_dictionary1)
520 # Call function to distribut all files and compute Time Series in all of them
521 #info_dictionary = flow_files_timeseries(files_averages,files_allbackbones,information_dictionary2,frames_limit)
522 return information_dictionary2 # Return the dictionary to convert it to a json
524# Function to compute the stiffness
525def get_stiffness(sequence,files,info_dict,frames_limit):
526 # Create and object (as NASSA does) to store the sequence and the corresponding .ser info so then we can calculate the stiffness
527 extracted = {}
528 # First parse the sequence to obtain the NucleicAcid object as NASSA wf does
529 seq = []
530 seq.append(load_sequence2(sequence, unit_len=6))
531 extracted['sequences'] = seq
532 # Set the hexamer coordinates
533 # AGUS: si son pentámeros habría que añadir las coordenadas: shear stagger stretch chic chiw buckle opening propel
534 cordinate_list = ['shift','slide','rise','roll','tilt','twist']
535 for coord in cordinate_list:
536 crd_data = []
537 for ser_file in files:
538 if ser_file.split('_')[-1].split('.')[0].lower() == coord:
539 ser = load_serfile(
540 ser_file)
541 crd_data.append(ser)
542 extracted[coord.lower()] = crd_data
543 print(f"loaded {len(crd_data)} files for coordinate <{coord}>")
544 results = transform(extracted)
545 # Add the results stiffness to the dictionary
546 info_dict['avg_res']['stiffness'] = results
547 return info_dict
549# AGUS: estas tres funciones siguientes son una copia de NASSA pero modificada para este caso ya que NASSA
550# AGUS: funciona para todas las secuencias o MD al mismo tiempo y el WF para cada proyecto/MD
551# AGUS: además, ahora se define por defecto el nombre de la unidad como hexámero
552def transform(data, unit_name='hexamer'):
553 sequences = data.pop('sequences')
554 results = {"stiffness": [], "covariances": {}, "constants": {}}
555 for traj, seq in enumerate(sequences):
556 traj_series = {coord.lower(): data[coord][traj]
557 for coord in data.keys()}
558 traj_results = get_stiffness_seq(
559 seq,
560 traj_series)
561 results["stiffness"].append(traj_results["stiffness"])
562 covariances_serializable = {
563 key: df.to_dict(orient="split") # 'index', 'columns', 'data'
564 for key, df in traj_results["covariances"].items()
565 }
566 results["covariances"].update(covariances_serializable)
567 constants_serializable = {
568 key: df.to_dict(orient="split") # 'index', 'columns', 'data'
569 for key, df in traj_results["constants"].items()
570 }
571 results["constants"].update(constants_serializable)
573 # AGUS: aquí se hace un cambio en la estructura del diccionario de stiffness para que sea más fácil de manejar en front end
574 stiffness_by_coord = {}
575 for item in results["stiffness"]:
576 for key, value in item.items():
577 if key == "hexamer":
578 continue
579 if key not in stiffness_by_coord:
580 stiffness_by_coord[key] = []
581 stiffness_by_coord[key].append(value)
582 results["stiffness"] = stiffness_by_coord
583 for key, value in results["stiffness"].items():
584 # Si es una lista con Series adentro, concatenamos y convertimos a list
585 if isinstance(value, list) and all(hasattr(v, "tolist") for v in value):
586 combined = pd.concat(value)
587 results["stiffness"][key] = combined.tolist()
588 # Si es una sola Series
589 elif hasattr(value, "tolist"):
590 results["stiffness"][key] = value.tolist()
591 return results
593def get_stiffness_seq(
594 sequence,
595 series_dict):
596 # get stiffness table for a given trajectory
597 coordinates = list(series_dict.keys())
598 results = {"stiffness": {}, "covariances": {}, "constants": {}}
599 diagonals = {}
600 start = sequence.flanksize # + 2
601 end = sequence.size - (sequence.baselen + sequence.flanksize - 1) # +2
602 print(f"start: {start}, end: {end}")
603 for i in range(start, end):
604 hexamer = sequence.get_subunit(i)
605 #ic_hexamer = sequence.inverse_complement(hexamer)
606 cols_dict = {coord: series_dict[coord][i+1]
607 for coord in series_dict.keys()}
608 #print(f"cols_dict: {cols_dict}")
609 stiffness_diag, cte, cov_df = get_subunit_stiffness(
610 cols_dict,
611 coordinates)
612 diagonals[hexamer] = np.append(
613 stiffness_diag,
614 [np.product(stiffness_diag), np.sum(stiffness_diag)])
615 # diagonals[ic_hexamer] = np.append(
616 # stiffness_diag,
617 # [np.product(stiffness_diag), np.sum(stiffness_diag)])
618 results["covariances"][hexamer] = cov_df
619 #results["covariances"][ic_hexamer] = cov_df
620 results["constants"][hexamer] = cte
621 #results["constants"][ic_hexamer] = cte
622 # build stiffness table
623 columns = [sequence.unit_name] + coordinates + ["product", "sum"]
624 results["stiffness"] = pd.DataFrame.from_dict(
625 diagonals,
626 orient="index").reset_index()
627 results["stiffness"].columns = columns
628 return results
630def get_subunit_stiffness(
631 cols_dict,
632 coordinates,
633 scaling=[1, 1, 1, 10.6, 10.6, 10.6],
634 KT=0.592186827):
635 import numpy.ma as ma
636 def circ_avg(xarr, degrees=True):
637 n = len(xarr)
638 if degrees:
639 # convert to radians
640 xarr = xarr * np.pi / 180
641 av = np.arctan2(
642 (np.sum(np.sin(xarr)))/n,
643 (np.sum(np.cos(xarr)))/n) * 180 / np.pi
644 return av
645 # AGUS: insisto, está adaptado solamente para hexámeros
646 # if (self.unit_len % 2) == 0:
647 SH_av = cols_dict["shift"].mean()
648 SL_av = cols_dict["slide"].mean()
649 RS_av = cols_dict["rise"].mean()
650 TL_av = circ_avg(cols_dict["tilt"])
651 RL_av = circ_avg(cols_dict["roll"])
652 TW_av = circ_avg(cols_dict["twist"])
653 # elif (self.unit_len % 2) == 1:
654 # SH_av = cols_dict["shear"].mean()
655 # SL_av = cols_dict["stretch"].mean()
656 # RS_av = cols_dict["stagger"].mean()
657 # CW_av = cols_dict["chiw"].mean()
658 # CC_av = cols_dict["chic"].mean()
659 # TL_av = self.circ_avg(cols_dict["buckle"])
660 # RL_av = self.circ_avg(cols_dict["propel"])
661 # TW_av = self.circ_avg(cols_dict["opening"])
662 cols_arr = [cols_dict[coord] for coord in coordinates]
663 cols_arr = np.array(cols_arr).T
665 cv = ma.cov(ma.masked_invalid(cols_arr), rowvar=False)
666 cv.filled(np.nan)
668 cov_df = pd.DataFrame(cv, columns=coordinates, index=coordinates)
669 stiff = np.linalg.inv(cv) * KT
670 #print(stiff)
671 # Added two new variables: ChiC and ChiW -> 8 (for PENTAMERS) => in NASSA (here only for hexamers)
672 if (6 % 2) == 0:
673 last_row = [SH_av, SL_av, RS_av, TL_av, RL_av, TW_av] #, CW_av, CC_av]
674 stiff = np.append(stiff, last_row).reshape(7, 6)
675 # elif (self.unit_len % 2) == 1:
676 # last_row = [SH_av, SL_av, RS_av, TL_av, RL_av, TW_av, CW_av, CC_av]
677 # print(last_row)
678 # stiff = np.append(stiff, last_row).reshape(9, 8)
679 # scaling=[1, 1, 1, 10.6, 10.6, 10.6, 1, 1]
681 stiff = stiff.round(6)
682 stiff_diag = np.diagonal(stiff) * np.array(scaling)
684 cte = pd.DataFrame(stiff)
685 cte.columns = coordinates
686 cte.index = coordinates + ["avg"]
687 return stiff_diag, cte, cov_df
689# Function to check whether the file is from one block or another regarding Helical parameters, the value of baselen could change
690def checking(helpar_name):
691 # get base length and unit from helical parameter name
692 if helpar_name.lower() in hp_basepairs:
693 baselen = 1
694 elif helpar_name.lower() in hp_singlebases:
695 baselen = 0
696 return baselen
698# Function to check whether the file is from one block or another regarding Helical parameters, the value of baselen could change
699def checking2(helpar_name):
700 # get base length and unit from helical parameter name
701 if helpar_name.lower() in hp_singlebases:
702 baselen = 0
703 else:
704 baselen = 1
705 if helpar_name.lower() in hp_angular:
706 hp_unit = "Degrees"
707 else:
708 hp_unit = "Angstroms"
709 return baselen,hp_unit
711# Function to compute the inverse complement DNA or RNA sequence it is comented becuase it is not used but in case it must be used it is here
712'''
713# Compute inverse complement DNA or RNA sequence
714def reverse_sequence(sequence,DNA=True):
715 if DNA: # If DNA flag is tru we want to compute the inverse using T instead of U
716 A_base = "T"
717 else: # Now it is RNA so we want to convert T to U
718 A_base = "U"
719 inverse = {"A":A_base,"G":"C","C":"G",A_base:"A"} # Dictionary to convert easily the sequence
720 inv_seq = ""
721 for i in sequence[::-1]: # Traverse the sequence from the end to the beginning
722 inv_seq += inverse[i] # Obtain the complementary base
723 return inv_seq # Return the inverse complement
724'''
726# Function to read a given .ser file and transform it into pandas dataframe
727def read_series(input_serfile, usecols=None):
728 """Read .ser file"""
729 extra_kwargs = dict(
730 header=None,
731 sep='\\s+',
732 index_col=0)
733 ser_data = pd.read_csv(input_serfile, **extra_kwargs)
734 if usecols is not None:
735 if 0 in usecols:
736 usecols.pop(usecols.index(0))
737 ser_data = ser_data[[i+1 for i in usecols]]
738 return ser_data
741# AVERAGES FUNCTION TO COMPUTE THE AVERAGE OF THE FOLLOWING BLOCKS REGARDING THE HELICAL PARAMETERS
742# GROOVES, INTRA BASE PAIR, INTER BASE PAIR, AXIS BASE PAIR
744# FUNCTION TO CONTROL FLOW OF FILES THAT ENTERS THE FUNCTION TO BE ANALYZED IN ORDER TO COMPUTE AVERAGE (MEAN AND STANDARD DEVIATION)
745def flow_files_average(files,info_dict,sequence):
746 for file in files: # Iterate over all files that must be analyzed
747 word = file.split('.')
748 helpword = word[0].split('_')[-1].lower()
749 # Obtain baselen value depending on the file
750 baselen = checking(helpword)
751 # Convert the files into Pandas dataframe to make the computations and data manipulation easily
752 dataframe = read_series(file)
753 df1,df2 = average_std(dataframe,baselen,sequence)
754 df1i,df2i = average_std_intra(dataframe,baselen,sequence)
755 if helpword in ["roll","tilt","twist","rise","shift","slide"]:
756 # INTER BASEPAIR BLOCK
757 info_dict['avg_res']['interbp'][helpword] = {'avg':{},'std':{}}
758 # we want to store all the information regarding the averages
759 info_dict['avg_res']['interbp'][helpword]['avg'] = df1.T.values.tolist()
761 # we want to store all the information regarding the standard deviations
762 info_dict['avg_res']['interbp'][helpword]['std'] = df2.T.values.tolist()
764 elif helpword in ["shear","stagger","stretch","buckle","opening","propel"]:
765 # INTRA BASEPAIR BLOCK
766 info_dict['avg_res']['intrabp'][helpword] = {'avg':{},'std':{}}
767 # we want to store all the information regarding the averages
768 info_dict['avg_res']['intrabp'][helpword]['avg'] = df1i.T.values.tolist()
770 # we want to store all the information regarding the standard deviations
771 info_dict['avg_res']['intrabp'][helpword]['std'] = df2i.T.values.tolist()
773 elif helpword in ["xdisp","ydisp","inclin","tip"]:
774 # AXIS BASEPAIR BLOCK
775 info_dict['avg_res']['axisbp'][helpword] = {'avg':{},'std':{}}
777 # we want to store all the information regarding the averages
778 info_dict['avg_res']['axisbp'][helpword]['avg'] = df1i.T.values.tolist()
780 # we want to store all the information regarding the standard deviations
781 info_dict['avg_res']['axisbp'][helpword]['std'] = df2i.T.values.tolist()
783 else:
784 # GROOVES BLOCK
785 info_dict['avg_res']['grooves'][helpword] = {'avg':{},'std':{}}
787 # we want to store all the information regarding the averages
788 info_dict['avg_res']['grooves'][helpword]['avg'] = df1.T.values.tolist()
790 # we want to store all the information regarding the standard deviations
791 info_dict['avg_res']['grooves'][helpword]['std'] = df2.T.values.tolist()
793 return info_dict # Return dictionary with all information computed (averages and standard deviation)
795# Compute average and standard deviation
796def average_std(dataf,baselen,sequence):
797 # For hexamers, baselen has to be 2
798 baselen = 2
799 # discard first and last base(pairs) from sequence
800 dataf = dataf[dataf.columns[2:17]]
801 # sequence = sequence[1:]
802 # print("sequence: ",sequence)
803 xlabels = [
804 f"{sequence[i:i+1+baselen]}"
805 for i in range(len(dataf.columns))] # - baselen
806 means = dataf.mean(axis=0).iloc[:len(xlabels)]
807 stds = dataf.std(axis=0).iloc[:len(xlabels)]
808 return means,stds
810def average_std_intra(dataf,baselen,sequence):
811 # For hexamers, baselen has to be 2
812 baselen = 2
813 # discard first and last base(pairs) from sequence
814 dataf = dataf[dataf.columns[2:18]]
815 # sequence = sequence[1:]
816 # print("sequence: ",sequence)
817 xlabels = [
818 f"{sequence[i:i+1+baselen]}"
819 for i in range(len(dataf.columns))] # - baselen
820 means = dataf.mean(axis=0).iloc[:len(xlabels)]
821 stds = dataf.std(axis=0).iloc[:len(xlabels)]
822 return means,stds
824# OTHER FUNCTIONS THAT MUST PERFORM OTHER CALCULATIONS REGARDING TO THE LAST BLOCK OF HELICAL PARAMETERS (BACKBONE TORSIONS)
825# THERE ARE 3 DIFFERENT FUNCTIONS EACH ONE COMPUTING DIFFERENT ASPECTS OF THE FLEXIBILITY IN THE BACKBONE
826# SUGAR PUCKERING, CANONICAL ALPHA/GAMMA AND BI/BII POPULATIONS
828# FUNCTION TO CONTROL
829def flow_files_backbone(files,info_dict):
830 # Store files related to Puckering computations
831 puckering_files = []
832 # Store files related to Canonical Alpha Gamma computations
833 canonicalg_files = []
834 # Store files related to BI and BII populations
835 bipopulations_files = []
836 # Iterate over all files that must be analyzed
837 for file in files:
838 word = file.split('.')
839 helpword = word[0].split('_')[-1].lower()
840 # Sugar Puckering
841 if helpword in ['phasec','phasew']:
842 puckering_files.append(file)
843 # Canonical Alpha/Gamma
844 elif helpword in ['alphac','alphaw','gammac','gammaw']:
845 canonicalg_files.append(file)
846 # BI/BII Population
847 elif helpword in ['epsilc','epsilw','zetac','zetaw']:
848 bipopulations_files.append(file)
849 info_dict['avg_res']['backbone'] = {'puckering':{'north':{},'east':{},'west':{},'south':{}},'bi':{},'bii':{},'canonical_alphagamma':{}}
850 # Call function to perform computations regarding Sugar Puckering
851 npop, epop, wpop, spop = sugar_puckering(puckering_files)
852 info_dict['avg_res']['backbone']['puckering']['north'] = npop.T.values.tolist()
853 info_dict['avg_res']['backbone']['puckering']['east'] = epop.T.values.tolist()
854 info_dict['avg_res']['backbone']['puckering']['west'] = wpop.T.values.tolist()
855 info_dict['avg_res']['backbone']['puckering']['south'] = spop.T.values.tolist()
856 # Call function to perform computations regarding Canonical Alpha Gamma
857 canon = canonical_alphagamma(canonicalg_files)
858 info_dict['avg_res']['backbone']['canonical_alphagamma'] = canon.T.values.tolist()
859 # Call function to perform computations regarding BI and BII populations
860 b1,b2 = bi_populations(bipopulations_files)
861 info_dict['avg_res']['backbone']['bi'] = b1.T.values.tolist()
862 info_dict['avg_res']['backbone']['bii'] = b2.T.values.tolist()
863 return info_dict
865######## SUGAR PUCKERING
867# READ FILES AND CORRECT ANGLES
868def sugar_puckering(pcukfiles):
869 for file in pcukfiles: # Iterate over all files
870 if file[-5:] == "C.ser":
871 phasec = file
872 else:
873 phasew = file
875 # Convert the files into Pandas dataframe to make the computations and data manipulation easily
876 phasec = read_series(phasec)
877 # Convert the files into Pandas dataframe to make the computations and data manipulation easily
878 phasew = read_series(phasew)
879 # Fix angles that are negative
880 phasec = fix_angles(phasec)
881 # Fix angles that are negative
882 phasew = fix_angles(phasew)
884 return cpuck(phasec,phasew)
886# Correct angles values as we dont want negative angles measurements
887def fix_angles(dataset):
888 values = np.where(dataset < 0, dataset + 360, dataset)
889 dataset = pd.DataFrame(values)
890 return dataset
892# Compute sugar puckering, using 2 files phaseC and phaseW
893def cpuck(phaseC,phaseW):
894 separator_df = pd.DataFrame({"-": np.nan}, index=range(1, len(phaseC)))
895 phase = pd.concat([
896 phaseW,
897 separator_df,
898 phaseC[phaseC.columns[::-1]]],
899 axis=1)
901 Npop = np.logical_or(phase > 315, phase < 45).mean() * 100
902 Epop = np.logical_and(phase > 45, phase < 135).mean() * 100
903 Wpop = np.logical_and(phase > 225, phase < 315).mean() * 100
904 Spop = np.logical_and(phase > 135, phase < 225).mean() * 100
906 return Npop, Epop, Wpop, Spop
908######## CANONICAL ALPHA/GAMMA
910# READ FILES AND CORRECT ANGLES REGARDING CANICAL ALPHA GAMMA
911def canonical_alphagamma(canonfiles):
912 for file in canonfiles:
913 if file[-7:] == "haC.ser":
914 alphac = file
915 elif file[-7:] == "haW.ser":
916 alphaw = file
917 elif file[-7:] == "maC.ser":
918 gammac = file
919 elif file[-7:] == "maW.ser":
920 gammaw = file
922 # Convert the files into Pandas dataframe to make the computations and data manipulation easily
923 alphac = read_series(alphac)
924 # Convert the files into Pandas dataframe to make the computations and data manipulation easily
925 alphaw = read_series(alphaw)
926 # Convert the files into Pandas dataframe to make the computations and data manipulation easily
927 gammac = read_series(gammac)
928 # Convert the files into Pandas dataframe to make the computations and data manipulation easily
929 gammaw = read_series(gammaw)
930 # Fix angles that are negative and over 360 degrees
931 alphac = fix_angles2(alphac)
932 # Fix angles that are negative and over 360 degrees
933 alphaw = fix_angles2(alphaw)
934 # Fix angles that are negative and over 360 degrees
935 gammac = fix_angles2(gammac)
936 # Fix angles that are negative and over 360 degrees
937 gammaw = fix_angles2(gammaw)
939 return check_alpgamm(alphac,gammac,alphaw,gammaw)
941# Correct angles values as we dont want negative angles measurements and more than 360 degrees
942def fix_angles2(dataset):
943 values = np.where(dataset < 0, dataset + 360, dataset)
944 values = np.where(values > 360, values - 360, values)
945 dataset = pd.DataFrame(values)
946 return dataset
948# Compute canonical populations using 4 different files alphaC, gammaC, alphaW and gammaW
949def check_alpgamm(alphaC,gammaC,alphaW,gammaW):
950 separator_df = pd.DataFrame({"-": np.nan}, index=range(len(gammaW)))
951 gamma = pd.concat([
952 gammaW,
953 separator_df,
954 gammaC[gammaC.columns[::-1]]],
955 axis=1)
956 alpha = pd.concat([
957 alphaW,
958 separator_df,
959 alphaC[alphaC.columns[::-1]]],
960 axis=1)
961 alpha_filter = np.logical_and(alpha > 240, alpha < 360)
962 gamma_filter = np.logical_and(gamma > 0, gamma < 120)
963 canonical_alpha_gamma = np.logical_and(
964 alpha_filter, gamma_filter).mean() * 100
966 return canonical_alpha_gamma
968# BI / BII POPULATIONS
970def bi_populations(bfiles):
971 for file in bfiles:
972 if file[-7:] == "ilC.ser":
973 epsilc = file
974 elif file[-7:] == "ilW.ser":
975 epsilw = file
976 elif file[-7:] == "taC.ser":
977 zetac = file
978 elif file[-7:] == "taW.ser":
979 zetaw = file
981 # Convert the files into Pandas dataframe to make the computations and data manipulation easily
982 epsilc = read_series(epsilc)
983 # Convert the files into Pandas dataframe to make the computations and data manipulation easily
984 epsilw = read_series(epsilw)
985 # Convert the files into Pandas dataframe to make the computations and data manipulation easily
986 zetac = read_series(zetac)
987 # Convert the files into Pandas dataframe to make the computations and data manipulation easily
988 zetaw = read_series(zetaw)
989 # Fix angles that are negative and over 360 degrees
990 epsilc = fix_angles2(epsilc)
991 # Fix angles that are negative and over 360 degrees
992 epsilw = fix_angles2(epsilw)
993 # Fix angles that are negative and over 360 degrees
994 zetac = fix_angles2(zetac)
995 # Fix angles that are negative and over 360 degrees
996 zetaw = fix_angles2(zetaw)
997 # Compute differences between Epsilon and Zeta values
998 diff_epsil_zeta = angles_diff_ze(epsilc,zetac,epsilw,zetaw)
999 BI,BII = bi_pop(diff_epsil_zeta)
1000 return BI,BII
1002# Compute difference between epsil and zeta
1003def angles_diff_ze( epsilC, zetaC, epsilW, zetaW):
1004 # concatenate zeta and epsil arrays
1005 separator_df = pd.DataFrame({"-": np.nan}, index=range(len(zetaW)))
1006 zeta = pd.concat([
1007 zetaW,
1008 separator_df,
1009 zetaC[zetaC.columns[::-1]]],
1010 axis=1)
1011 epsil = pd.concat([
1012 epsilW,
1013 separator_df,
1014 epsilC[epsilC.columns[::-1]]],
1015 axis=1)
1017 # difference between epsilon and zeta coordinates
1018 diff_epsil_zeta = epsil - zeta
1019 return diff_epsil_zeta
1021# Compute BI and BII populations
1022def bi_pop(diff_epsil_zeta):
1023 BI = (diff_epsil_zeta < 0).sum(axis=0) * 100 / len(diff_epsil_zeta)
1024 BII = 100 - BI
1025 return BI, BII
1027######## TIME SERIES
1029# Function to distribute all .ser files in order to compute later Time Series
1030def flow_files_timeseries(files_average,files_backbone,info_dict,frames_limit):
1031 for file in files_average: # Iterate over all files that must be analyzed
1032 word = file.split('.')
1033 helpword = word[0].split('_')[-1].lower()
1034 df = time_series(file,helpword,frames_limit)
1036 # Store all computations from files that are related to the block Inter Basepair
1037 if helpword in ["roll","tilt","twist","rise","shift","slide"]:
1038 info_dict['ts']['interbp'][helpword] = {}
1039 # Store all information
1040 info_dict['ts']['interbp'][helpword] = df.T.values.tolist()
1042 # Store all computations from files that are related to the block Intra Basepair
1043 elif helpword in ["shear","stagger","stretch","buckle","opening","propel"]:
1044 info_dict['ts']['intrabp'][helpword] = {}
1045 # Store all information
1046 info_dict['ts']['intrabp'][helpword] = df.T.values.tolist()
1048 # Store all computations from files that are related to the block Axis Basepair
1049 elif helpword in ["xdisp","ydisp","inclin","tip"]:
1050 info_dict['ts']['axisbp'][helpword] = {}
1051 # Store all information
1052 info_dict['ts']['axisbp'][helpword] = df.T.values.tolist()
1055 else:
1056 # Store all computations from files that are related to the block Grooves
1057 info_dict['ts']['grooves'][helpword] = {}
1058 # Store all information
1059 info_dict['ts']['grooves'][helpword] = df.T.values.tolist()
1061 for file2 in files_backbone:
1062 word = file2.split('.')
1063 helpword = word[0].split('_')[-1].lower()
1064 df1 = time_series(file2,helpword,frames_limit)
1065 # Store all computations from files that are related to the block Backbone Torsions
1066 info_dict['ts']['backbone'][helpword] = {}
1067 # Store all information
1068 info_dict['ts']['backbone'][helpword] = df1.T.values.tolist()
1071 return info_dict
1073# Function to compute Time Series for each files that flow_files_timeseries() passes
1074def time_series(file,word,reduced_trajectory_frames_limit):
1075 sequence = "GCTTTTAGCGGTTGAACGGC"
1076 baselen,hp_unit = checking2(word)
1077 ser_data = read_series(file)
1078 ser_data = ser_data[ser_data.columns[1:-1]]
1079 sequence = sequence[1:]
1081 '''
1082 subunits = [
1083 f"{i+1}_{sequence[i:i+1+baselen]}"
1084 for i in range(len(ser_data.columns))]
1085 ser_data.columns = subunits
1086 '''
1087 snapshots = len(ser_data.index)
1089 step = math.ceil(snapshots)# / reduced_trajectory_frames_limit)
1091 ser_data = ser_data[0:len(ser_data.index):step]
1092 return ser_data