Coverage for model_workflow/analyses/nassa.py: 59%
684 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
1from itertools import product, combinations_with_replacement
2import numpy.ma as ma
3import pandas as pd
4import numpy as np
5import os
6from os import chdir, getcwd
7from pathlib import Path
9from model_workflow.tools.nassa_base import Base
10from model_workflow.tools.nassa_loaders import load_sequence
11from model_workflow.tools.nassa_loaders import load_serfile
12from model_workflow.utils.constants import NASSA_ANALYSES_CANALS, BLUE_HEADER, COLOR_END, CYAN_HEADER, GREEN_HEADER
14from model_workflow.utils.heatmaps_nassa import basepair_plot
15from model_workflow.utils.heatmaps_nassa import bconf_heatmap
16from model_workflow.utils.heatmaps_nassa import correlation_plot
17from model_workflow.utils.heatmaps_nassa import arlequin_plot
18from model_workflow.utils.bibitransformer_nassa import BiBiTransformer
19from model_workflow.utils.auxiliar import InputError
20from model_workflow.utils.nassa_file import generate_nassa_config
21from typing import Optional, List
22import yaml
24# There are five analyses in total
26# BASE PAIR CORRELATION ANALYSIS
27class BasePairCorrelation(Base):
28 """
29 Execution plan and methods for basepair correlation analysis.
30 """
32 def __init__(self, **kwargs):
33 super().__init__(**kwargs)
35 def extract(self):
36 extracted = {}
37 sequences = []
38 for seq_file in self.sequence_files:
39 seq = load_sequence(
40 seq_file,
41 unit_name=self.unit_name,
42 unit_len=self.unit_len)
43 sequences.append(seq)
44 self.Ac = seq.Ac
45 self.logger.debug(f"Adenine complement set to: <{self.Ac}>")
46 extracted["sequences"] = sequences
47 self.logger.info(f"loaded {len(sequences)} sequences")
49 #Iterate over the different parameters: shift, twist, roll, slide, tilt, rise
50 bpcorr_coordiantes = NASSA_ANALYSES_CANALS['bpcorr']
51 for bpcorr_coord in bpcorr_coordiantes:
52 for coord in self.coordinate_info.keys():
53 if coord == bpcorr_coord:
54 crd_data = []
55 for ser_file in self.coordinate_info[coord]:
56 ser = load_serfile(
57 ser_file,
58 self.tail,
59 self.n_lines)
60 crd_data.append(ser)
61 extracted[coord.lower()] = crd_data
62 self.logger.info(f"loaded {len(crd_data)} files for coordinate <{coord}>")
64 return extracted
66 def extraer_tetramero_central(hexamero):
67 return hexamero[1:5]
69 def transform(self, data):
70 sequences = data.pop("sequences")
71 # iterate over trajectories
72 corr_results = {}
73 for traj, seq in enumerate(sequences):
74 trajectory_series = {coord.lower(): data[coord][traj]
75 for coord in data.keys()}
76 coordinate_corr = self.iterate_trajectory(
77 seq, trajectory_series)
78 corr_results[seq.sequence] = coordinate_corr
79 joined_df = []
80 for seq, val in corr_results.items():
81 df = pd.DataFrame.from_dict(val).T
82 joined_df.append(df)
83 joined_df = pd.concat(joined_df)
84 return joined_df
86 def iterate_trajectory(self, sequence, coordinates):
87 coordinate_correlations = {}
88 # iterate over subunits
89 start = 2 + sequence.flanksize
90 end = sequence.size - (2 + sequence.baselen + sequence.flanksize - 1)
91 # subtract 1 from end in order to stop at last sub1unit
92 for idx in range(start, end-1):
93 subunit = sequence.get_subunit(idx)
94 next_subunit = sequence.get_subunit(idx+1)
95 if subunit in coordinate_correlations:
96 self.logger.info(
97 f"skipping repeated {self.unit_name} {subunit}...")
98 continue
99 self.logger.info(
100 f"analyzing {self.unit_name} {subunit}...")
101 # add 1 to idx since .ser table includes an index
102 unit_df = pd.DataFrame(
103 {coord: coordinates[coord][idx+1]
104 for coord in coordinates.keys()})
105 next_unit_df = pd.DataFrame(
106 {coord: coordinates[coord][idx+2]
107 for coord in coordinates.keys()})
108 crd_corr = self.get_correlation(next_unit_df, unit_df) # inverse order ?
109 coordinate_correlations[f"{subunit}/{next_subunit}"] = crd_corr
110 return coordinate_correlations
112 def get_correlation(self, unit, next_unit):
113 method = {
114 "shift": "linear",
115 "slide": "linear",
116 "rise": "linear",
117 "tilt": "circular",
118 "roll": "circular",
119 "twist": "circular"}
120 coordinates = method.keys()
121 combos = product(coordinates, repeat=2)
122 result = {}
123 for crd1, crd2 in combos:
124 method1 = method[crd1]
125 method2 = method[crd2]
126 arr1 = unit[crd1]
127 arr2 = next_unit[crd2]
128 value = self.get_corr_by_method(
129 method1,
130 method2,
131 arr1,
132 arr2)
133 result[f"{crd1}/{crd2}"] = value
134 return result
136 def get_corr_by_method(self, method1, method2, arr1, arr2):
137 if method1 == "circular" and method2 == "linear":
138 value = self.circlineal(arr2, arr1)
139 if method1 == "linear" and method2 == "circular":
140 value = self.circlineal(arr1, arr2)
141 elif method1 == "circular" and method2 == "circular":
142 value = self.circular(arr1, arr2)
143 else:
144 value = ma.corrcoef(ma.masked_invalid(arr1), ma.masked_invalid(arr2))[1, 0]
145 return value
147 @staticmethod
148 def circular(x1, x2):
149 x1 = x1 * np.pi / 180
150 x2 = x2 * np.pi / 180
151 diff_1 = np.sin(x1 - x1.mean())
152 diff_2 = np.sin(x2 - x2.mean())
153 num = (diff_1 * diff_2).sum()
154 den = np.sqrt((diff_1 ** 2).sum() * (diff_2 ** 2).sum())
155 return num / den
157 @staticmethod
158 def circlineal(x1, x2):
159 x2 = x2 * np.pi / 180
160 x1 = x1.to_numpy()
161 x2 = x2.to_numpy()
162 rc = ma.corrcoef(ma.masked_invalid(x1), ma.masked_invalid(np.cos(x2)))[1, 0]
163 rs = ma.corrcoef(ma.masked_invalid(x1), ma.masked_invalid(np.sin(x2)))[1, 0]
164 rcs = ma.corrcoef(ma.masked_invalid(np.sin(x2)), ma.masked_invalid(np.cos(x2)))[1, 0]
165 num = (rc ** 2) + (rs ** 2) - 2 * rc * rs * rcs
166 den = 1 - (rcs ** 2)
167 correlation = np.sqrt(num / den)
168 if ma.corrcoef(ma.masked_invalid(x1), ma.masked_invalid(x2))[1, 0] < 0:
169 correlation *= -1
170 return correlation
172 def make_tables(self, dataset):
173 dataset.to_csv(self.save_path / "all_basepairs.csv")
175 def make_plots(self, dataset):
176 basepair_plot(dataset, "all_basepairs", self.save_path)
178# BASE PAIR CONFIRMATION ANALYSIS
180class BConformations(Base):
181 """Execution plan and methods for BI/BII conformations analysis pipeline"""
183 def __init__(self, **kwargs):
184 super().__init__(**kwargs)
185 valid_coordinates = ["epsilC", "zetaC", "epsilW", "zetaW"]
186 for coord in self.coordinate_info.keys():
187 try:
188 assert coord in valid_coordinates
189 except AssertionError as e:
190 raise ValueError(
191 f"{coord} is not a valid coordinate! "
192 "Please rename coordinates in your configuration file "
193 f"to match any of {valid_coordinates}") from e
195 def extract(self):
196 extracted = {}
197 sequences = []
198 for seq_file in self.sequence_files:
199 seq = load_sequence(
200 seq_file,
201 unit_name=self.unit_name,
202 unit_len=self.unit_len)
203 sequences.append(seq)
204 self.Ac = seq.Ac
205 self.logger.debug(f"Adenine complement set to: <{self.Ac}>")
206 extracted["sequences"] = sequences
207 self.logger.info(f"loaded {len(sequences)} sequences")
208 bconf_coordiantes = NASSA_ANALYSES_CANALS['bconf']
209 for bconf_coord in bconf_coordiantes:
210 for coord in self.coordinate_info.keys():
211 if coord == bconf_coord:
212 crd_data = []
213 for ser_file in self.coordinate_info[coord]:
214 ser = load_serfile(
215 ser_file,
216 self.tail,
217 self.n_lines)
218 crd_data.append(ser)
219 extracted[coord.lower()] = crd_data
220 self.logger.info(
221 f"loaded {len(crd_data)} files for coordinate <{coord}>")
222 return extracted
224 def transform(self, data):
225 sequences = data.pop("sequences")
226 angles_df = []
227 # get dataframe for each coordinate
228 for traj, seq in enumerate(sequences):
229 # start reading from the 4th column:
230 # skip index, first two bases and first flanks
231 start = 3 + seq.flanksize
232 # skip last two bases
233 end = seq.size - (seq.baselen + seq.flanksize)
234 # select relevant subset of columns
235 epsilC = data["epsilc"][traj][start:end]
236 zetaC = data["zetac"][traj][start:end]
237 epsilW = data["epsilw"][traj][start:end]
238 zetaW = data["zetaw"][traj][start:end]
239 traj_df = self.get_angles_difference(
240 seq,
241 epsilC,
242 zetaC,
243 epsilW,
244 zetaW)
245 angles_df.append(traj_df)
246 angles_df = pd.concat(angles_df, axis=1)
247 # percentages BI
248 B_I = (angles_df < 0).sum(axis=0) * 100 / len(angles_df) # self.n_lines
249 # clean dataset
250 B_I = B_I[~B_I.index.duplicated(keep='first')]
251 B_I = B_I.reset_index()
252 B_I = B_I.rename(columns={"index": self.unit_name, 0: "pct"})
253 return B_I
255 def get_angles_difference(self, seq, epsilC, zetaC, epsilW, zetaW):
256 # get list of tetramers, except first and last two bases
257 all_subunits = seq.all_subunits[2:-2]
258 all_ic_subunits = seq.all_ic_subunits[2:-2]
260 # concatenate zeta and epsil arrays
261 zeta = pd.concat([
262 pd.DataFrame(zetaW.T),
263 pd.DataFrame(zetaC[::-1].T)],
264 axis=1)
265 zeta.columns = all_subunits * 2
266 epsil = pd.concat([
267 pd.DataFrame(epsilW.T),
268 pd.DataFrame(epsilC[::-1].T)],
269 axis=1)
271 # This function was contained in a separated script in the original NASSA code (angle_utils.py)
272 def fix_angle_range(x, domain=[0, 360]):
273 """Fix angle range so it's in the given angle range (degrees) by adding or subtracting 360.
275 :param float x: angle value (asumed to be in degrees)
276 :param sequence domain: start and end of angle range.
278 : return float: angle value with fixed range """
279 while x < domain[0]:
280 x += 360
281 while x > domain[1]:
282 x -= 360
283 return x
285 epsil.columns = all_subunits * 2
286 # difference between epsilon and zeta coordinates
287 diff = epsil - zeta
288 diff = diff.applymap(lambda x: fix_angle_range(x, domain=[-180, 180]))
290 # repeat with inverse-complementary sequence
291 zeta_ic = pd.concat([
292 pd.DataFrame(zetaW.T),
293 pd.DataFrame(zetaC[::-1].T)],
294 axis=1)
295 zeta_ic.columns = all_ic_subunits * 2
296 epsil_ic = pd.concat([
297 pd.DataFrame(epsilW.T),
298 pd.DataFrame(epsilC[::-1].T)],
299 axis=1)
300 epsil_ic.columns = all_ic_subunits * 2
301 diff_ic = epsil_ic - zeta_ic
302 diff_ic = diff_ic.applymap(
303 lambda x: fix_angle_range(x, domain=[-180, 180]))
305 diff_df = pd.concat([diff, diff_ic], axis=1)
307 return diff_df
309 def make_tables(self, B_I):
310 B_I.to_csv(self.save_path / "BI.csv", index=False)
311 # create BII
312 B_II = B_I.copy()
313 B_II["pct"] = 100 - B_I["pct"]
314 B_II.to_csv(self.save_path / "BII.csv", index=False)
316 def make_plots(self, B_I):
317 bconf_heatmap(B_I, "BI", self.save_path, self.unit_len, self.Ac)
318 B_II = B_I.copy()
319 B_II["pct"] = 100 - B_I["pct"]
320 bconf_heatmap(B_II, "BII", self.save_path, self.unit_len, self.Ac)
322# COORDINATE CORRELATION ANALYSIS
323class CoordinateCorrelation(Base):
324 """
325 Execution plan and methods for correlation analyses
326 """
328 def __init__(self, **kwargs):
329 super().__init__(**kwargs)
331 def extract(self):
332 extracted = {}
333 sequences = []
334 for seq_file in self.sequence_files:
335 seq = load_sequence(
336 seq_file,
337 unit_name=self.unit_name,
338 unit_len=self.unit_len)
339 sequences.append(seq)
340 self.Ac = seq.Ac
341 self.logger.debug(f"Adenine complement set to: <{self.Ac}>")
342 extracted["sequences"] = sequences
343 self.logger.info(f"loaded {len(sequences)} sequences")
345 crdcorr_coordiantes = NASSA_ANALYSES_CANALS['crdcorr']
346 for crdcorr_coord in crdcorr_coordiantes:
347 for coord in self.coordinate_info.keys():
348 if coord == crdcorr_coord:
349 crd_data = []
350 for ser_file in self.coordinate_info[coord]:
351 ser = load_serfile(
352 ser_file,
353 self.tail,
354 self.n_lines)
355 crd_data.append(ser)
356 extracted[coord.lower()] = crd_data
357 self.logger.info(
358 f"loaded {len(crd_data)} files for coordinate <{coord}>")
359 return extracted
361 def transform(self, data):
362 sequences = data.pop("sequences")
363 # iterate over trajectories
364 corr_results = {}
365 for traj, seq in enumerate(sequences):
366 trajectory_series = {coord.lower(): data[coord][traj]
367 for coord in data.keys()}
368 correlations = self.iterate_trajectory(
369 seq, trajectory_series)
370 corr_results[traj] = correlations
371 return corr_results
373 def iterate_trajectory(self, sequence, coordinates):
374 corrtype = {
375 "shift": "linear",
376 "slide": "linear",
377 "rise": "linear",
378 "tilt": "circular",
379 "roll": "circular",
380 "twist": "circular"}
381 start = 3 + sequence.flanksize
382 end = sequence.size - sequence.flanksize - sequence.baselen
383 all_subunits = sequence.all_subunits[2:-2]
384 results_df = {}
385 for crd1, crd2 in combinations_with_replacement(
386 coordinates.keys(),
387 r=2):
388 crd1_series = pd.DataFrame(data=coordinates[crd1][start:end]).T
389 crd2_series = pd.DataFrame(data=coordinates[crd2][start:end]).T
390 crd1_series.columns = all_subunits
391 crd2_series.columns = all_subunits
392 method = self.get_corr_method(corrtype[crd1], corrtype[crd2])
393 corr_matrix = pd.DataFrame({
394 col: crd1_series.corrwith(
395 crd2_series[col],
396 method=method
397 ) for col in crd2_series.columns})
398 crd1_inner_dict = results_df.get(crd1, {})
399 crd1_inner_dict[crd2] = corr_matrix
400 results_df[crd1] = crd1_inner_dict
402 # build complete dataset
403 coords = list(results_df.keys())
404 for crd1, inner_dict in results_df.items():
405 missing_coords = [
406 crd for crd in coords if crd not in inner_dict.keys()]
407 for crd2 in missing_coords:
408 results_df[crd1][crd2] = results_df[crd2][crd1]
409 dfs = {k: pd.concat(v) for k, v in results_df.items()}
410 complete_df = pd.concat(dfs, axis=1)
411 results_df["complete"] = complete_df
412 return results_df
414 def get_corr_method(self, corrtype1, corrtype2):
415 if corrtype1 == "circular" and corrtype2 == "linear":
416 method = self.circlineal
417 if corrtype1 == "linear" and corrtype2 == "circular":
418 method = self.circlineal
419 elif corrtype1 == "circular" and corrtype2 == "circular":
420 method = self.circular
421 else:
422 method = "pearson"
423 return method
425 @staticmethod
426 def circular(x1, x2):
427 x1 = x1 * np.pi / 180
428 x2 = x2 * np.pi / 180
429 diff_1 = np.sin(x1 - x1.mean())
430 diff_2 = np.sin(x2 - x2.mean())
431 num = (diff_1 * diff_2).sum()
432 den = np.sqrt((diff_1 ** 2).sum() * (diff_2 ** 2).sum())
433 return num / den
435 @staticmethod
436 def circlineal(x1, x2):
437 x2 = x2 * np.pi / 180
438 rc = np.corrcoef(x1, np.cos(x2))[1, 0]
439 rs = np.corrcoef(x1, np.sin(x2))[1, 0]
440 rcs = np.corrcoef(np.sin(x2), np.cos(x2))[1, 0]
441 num = (rc ** 2) + (rs ** 2) - 2 * rc * rs * rcs
442 den = 1 - (rcs ** 2)
443 correlation = np.sqrt(num / den)
444 if np.corrcoef(x1, x2)[1, 0] < 0:
445 correlation *= -1
446 return correlation
448 def make_tables(self, dataset):
449 for traj, data in dataset.items():
450 # make a new directory to save data for each trajectory
451 save_subpath = self.save_path / f"traj_{traj}"
452 save_subpath.mkdir(exist_ok=True)
453 # data.to_csv(save_subpath / "coordinate_corr.csv")
454 # coords = data.columns.get_level_values(0).unique()
455 for crd1, inner_dict in data.items():
456 if crd1 == "complete":
457 inner_dict.to_csv(save_subpath / "coordinate_corr.csv")
458 else:
459 for crd2, df in inner_dict.items():
460 df.to_csv(save_subpath / f"{crd1}_{crd2}.csv")
462 def make_plots(self, dataset):
463 complete_dataset = pd.concat(
464 [value['complete'] for value in dataset.values()],
465 axis=1)
466 save_subpath = self.save_path / "complete"
467 save_subpath.mkdir(exist_ok=True)
468 correlation_plot(
469 complete_dataset,
470 "coordcorr",
471 save_subpath)
473# COORDINATE DISTRIBUTION ANALYSIS
474class CoordinateDistributions(Base):
475 """
476 Execution plan and methods for coordinate distributions analysis pipeline
477 """
479 def __init__(
480 self,
481 max_iter=400,
482 tol=1e-5,
483 **kwargs):
484 super().__init__(**kwargs)
485 self.max_iter = max_iter
486 self.tol = tol
487 self.logger.debug(f"max_iter: {self.max_iter}")
488 self.logger.debug(f"tol: {self.tol}")
490 def extract(self):
491 extracted = {}
492 sequences = []
493 for seq_file in self.sequence_files:
494 seq = load_sequence(
495 seq_file,
496 unit_name=self.unit_name,
497 unit_len=self.unit_len)
498 sequences.append(seq)
499 self.Ac = seq.Ac
500 self.logger.debug(f"Adenine complement set to: <{self.Ac}>")
501 extracted["sequences"] = sequences
502 self.logger.info(f"loaded {len(sequences)} sequences")
504 coordist_coordiantes = NASSA_ANALYSES_CANALS['coordist']
505 for coordist_coord in coordist_coordiantes:
506 for coord in self.coordinate_info.keys():
507 if coord == coordist_coord:
508 crd_data = []
509 for ser_file in self.coordinate_info[coord]:
510 ser = load_serfile(
511 ser_file,
512 self.tail,
513 self.n_lines)
514 crd_data.append(ser)
515 extracted[coord.lower()] = crd_data
516 self.logger.info(
517 f"loaded {len(crd_data)} files for coordinate <{coord}>")
518 return extracted
520 def transform(self, data):
521 sequences = data.pop("sequences")
522 subunits_classification = {}
523 # get dataframe for each coordinate
524 for coordinate, coord_dataset in data.items():
525 coordinate_df = self.coordinate_iteration(
526 sequences,
527 coordinate,
528 coord_dataset)
529 coordinate_df = coordinate_df.drop_duplicates(
530 subset=[self.unit_name])
531 coordinate_df, global_mean, global_std = self.add_modality_labels(
532 coordinate_df)
533 subunits_classification[coordinate] = [
534 coordinate_df, global_mean, global_std]
535 return subunits_classification
537 def coordinate_iteration(
538 self,
539 sequences,
540 coordinate,
541 coord_dataset):
542 coordinate_df = []
543 if self.duplicates:
544 trajectory_df = self.duplicate_trajectory_iteration(
545 coord_dataset,
546 sequences,
547 coordinate)
548 coordinate_df.append(trajectory_df)
549 else:
550 for seq, dataseries in zip(sequences, coord_dataset):
551 trajectory_df = self.trajectory_iteration(
552 dataseries,
553 seq,
554 coordinate)
555 coordinate_df.append(trajectory_df)
556 # concatenate all dataframes
557 coordinate_df = pd.concat(
558 coordinate_df,
559 axis=0)
560 return coordinate_df
562 def duplicate_trajectory_iteration(
563 self,
564 coord_dataset,
565 sequences,
566 coordinate):
567 trajectory_info = []
568 dict_all_subunits_ser = {}
569 # iterate over datasets and sequences
570 for dataseries, sequence in zip(coord_dataset, sequences):
571 # iterate over subunits
572 start = 2 + sequence.flanksize
573 end = sequence.size - (2 + sequence.baselen + sequence.flanksize - 1)
575 for idx in range(start, end):
576 subunit = sequence.get_subunit(idx)
577 ic_subunit = sequence.inverse_complement(subunit)
578 ser = dataseries[idx + 1]
579 ser = ser[~np.isnan(ser)].to_numpy()
581 if subunit in dict_all_subunits_ser:
582 dict_all_subunits_ser[subunit] = np.concatenate((dict_all_subunits_ser[subunit], ser))
583 else:
584 dict_all_subunits_ser[subunit] = ser
586 if ic_subunit not in dict_all_subunits_ser:
587 dict_all_subunits_ser[ic_subunit] = ser
589 for subunit, ser in dict_all_subunits_ser.items():
590 ser = ser.reshape(-1, 1)
591 subunit_information = self.subunit_iteration(
592 ser,
593 subunit,
594 coordinate)
595 if not self.bimod:
596 subunit_information["unimodal"] = True
597 trajectory_info.append(subunit_information)
599 # create dataframe from list of dictionaries
600 trajectory_df = pd.DataFrame.from_dict(trajectory_info)
601 return trajectory_df
603 def trajectory_iteration(
604 self,
605 dataseries,
606 sequence,
607 coordinate):
608 trajectory_info = []
609 # iterate over subunits
610 start = 2 + sequence.flanksize
611 end = sequence.size - (2 + sequence.baselen + sequence.flanksize - 1)
612 for idx in range(start, end):
613 # get unit and inverse-complement unit
614 subunit = sequence.get_subunit(idx)
615 ic_subunit = sequence.inverse_complement(subunit)
616 self.logger.info(
617 f"analyzing {self.unit_name} {subunit}/{ic_subunit}...")
618 # add 1 to idx since .ser table includes an index
619 ser = dataseries[idx + 1]
620 ser = ser[~np.isnan(ser)].to_numpy()
621 if ser.shape[0] < 2:
622 self.logger.info("skipping because of insufficient data!")
623 subunit_information = dict(
624 coordinate=coordinate,
625 binormal=False,
626 uninormal=True,
627 insuf_ev=False,
628 unimodal=True,
629 bics=[np.nan, np.nan],
630 mean1=np.nan,
631 mean2=np.nan,
632 var1=np.nan,
633 var2=np.nan,
634 w1=np.nan,
635 w2=np.nan)
636 subunit_information[self.unit_name] = subunit
637 else:
638 # reshape dataset
639 ser = ser.reshape(-1, 1)
640 subunit_information = self.subunit_iteration(
641 ser,
642 subunit,
643 coordinate)
644 if not self.bimod:
645 subunit_information["unimodal"] = True
646 trajectory_info.append(subunit_information)
647 # add inverse-complement
648 ic_subunit_information = subunit_information.copy()
649 ic_subunit_information[self.unit_name] = ic_subunit
650 trajectory_info.append(ic_subunit_information)
651 # create dataframe from list of dictionaries
652 trajectory_df = pd.DataFrame.from_dict(trajectory_info)
653 return trajectory_df
655 def subunit_iteration(
656 self,
657 ser,
658 subunit,
659 coordinate):
660 # model subunit's series
661 bibi = BiBiTransformer(max_iter=self.max_iter, tol=self.tol)
662 subunit_info = bibi.fit_transform(ser)
663 # add subunit name and coordinate to info dictionary
664 subunit_info[self.unit_name] = subunit
665 subunit_info["coordinate"] = coordinate
666 return subunit_info
668 def add_modality_labels(self, data):
669 df = data.set_index(self.unit_name)
671 # Series with global mean value for each tetramer
672 weighted_mean = df.apply(
673 lambda t: t["mean1"] if (
674 t["uninormal"] or t["insuf_ev"] or np.isnan(t["mean2"])
675 ) else (t["mean1"]*t["w1"]+t["mean2"]*t["w2"]),
676 axis=1)
678 # Series with comparison values
679 comparison_1 = df.apply(
680 lambda t: t["mean1"] if (
681 t["uninormal"] or t["insuf_ev"] or not t["unimodal"]
682 ) else t["mean1"]*t["w1"]+t["mean2"]*t["w2"],
683 axis=1)
685 # mean1: uninormal+unimodal
686 # u1*w1+u2*w2: binormal+unimodal
687 # mean2: binormal+bimodal
688 comparison_2 = df.apply(
689 lambda t: np.nan if t["unimodal"] else t["mean2"],
690 axis=1)
691 comparison_2 = comparison_2.fillna(comparison_1)
693 # get limiting values for col1 and col2
694 global_mean = weighted_mean.mean()
695 global_std = weighted_mean.std()
696 l1 = (global_mean + global_std)
697 l2 = (global_mean - global_std)
699 # 1: above mean+std (blue)
700 # 0: between mean-std and mean+std (white)
701 # -1: below mean-std (red)
702 col1 = comparison_1.apply(
703 lambda t: -1 if (t < l2) else (1 if (t > l1) else 0))
704 col1 = col1.rename("col1")
705 col2 = comparison_2.apply(
706 lambda t: -1 if (t < l2) else (1 if (t > l1) else 0))
707 col2 = col2.rename("col2")
709 df = pd.concat(
710 [df, col1, col2],
711 axis=1).reset_index()
713 return df, global_mean, global_std
715 def make_tables(self, dataset, index=True):
716 for coordinate, dataset in dataset.items():
717 dataset[0].to_csv(
718 self.save_path / f"{coordinate}.csv", index=index)
720 def make_plots(self, dataset):
721 for coordinate, data_dict in dataset.items():
722 dataframes = data_dict[0]
723 global_mean = data_dict[1]
724 global_std = data_dict[2]
725 arlequin_plot(
726 dataframes,
727 global_mean,
728 global_std,
729 coordinate,
730 self.save_path,
731 base=self.Ac,
732 unit_name=self.unit_name,
733 unit_len=self.unit_len)
736# STIFFNESS ANALYSIS
737class StiffnessDistributions(Base):
739 def __init__(
740 self,
741 *args,
742 **kwargs):
743 super().__init__(
744 *args,
745 **kwargs)
747 def extract(self):
748 extracted = {}
749 sequences = []
750 for seq_file in self.sequence_files:
751 seq = load_sequence(
752 seq_file,
753 unit_name=self.unit_name,
754 unit_len=self.unit_len)
755 sequences.append(seq)
756 self.Ac = seq.Ac
757 self.logger.debug(f"Adenine complement set to: <{self.Ac}>")
758 extracted["sequences"] = sequences
759 self.logger.info(f"loaded {len(sequences)} sequences")
760 # In this case the selection of coordinates is different, it depends on the length of the unit (par or impar)
761 if (self.unit_len % 2) == 0:
762 stiff_coordiantes = NASSA_ANALYSES_CANALS['bpcorr']
763 elif (self.unit_len % 2) == 1:
764 stiff_coordiantes = NASSA_ANALYSES_CANALS['stiff']
765 for stiff_coord in stiff_coordiantes:
766 for coord in self.coordinate_info.keys():
767 if coord == stiff_coord:
768 crd_data = []
769 for ser_file in self.coordinate_info[coord]:
770 ser = load_serfile(
771 ser_file,
772 self.tail,
773 self.n_lines)
774 crd_data.append(ser)
775 extracted[coord.lower()] = crd_data
776 self.logger.info(
777 f"loaded {len(crd_data)} files for coordinate <{coord}>")
778 return extracted
780 def transform(self, data):
781 sequences = data.pop("sequences")
782 results = {"stiffness": [], "covariances": {}, "constants": {}}
783 for traj, seq in enumerate(sequences):
784 traj_series = {coord.lower(): data[coord][traj]
785 for coord in data.keys()}
786 traj_results = self.get_stiffness(
787 seq,
788 traj_series)
789 results["stiffness"].append(traj_results["stiffness"])
790 results["covariances"].update(traj_results["covariances"])
791 results["constants"].update(traj_results["constants"])
792 stiffness_df = pd.concat(results["stiffness"], axis=0)
793 stiffness_df = stiffness_df.drop_duplicates(subset=[self.unit_name])
794 stiffness_df = stiffness_df.set_index(self.unit_name)
795 results["stiffness"] = stiffness_df
796 return results
798 def get_stiffness(
799 self,
800 sequence,
801 series_dict):
802 # get stiffness table for a given trajectory
803 coordinates = list(series_dict.keys())
804 results = {"stiffness": {}, "covariances": {}, "constants": {}}
805 diagonals = {}
806 start = 2 + sequence.flanksize
807 end = sequence.size - (2 + sequence.baselen + sequence.flanksize - 1)
808 for i in range(start, end):
809 tetramer = sequence.get_subunit(i)
810 ic_tetramer = sequence.inverse_complement(tetramer)
811 cols_dict = {coord: series_dict[coord][i+1]
812 for coord in series_dict.keys()}
813 stiffness_diag, cte, cov_df = self.get_subunit_stiffness(
814 cols_dict,
815 coordinates)
816 diagonals[tetramer] = np.append(
817 stiffness_diag,
818 [np.product(stiffness_diag), np.sum(stiffness_diag)])
819 diagonals[ic_tetramer] = np.append(
820 stiffness_diag,
821 [np.product(stiffness_diag), np.sum(stiffness_diag)])
822 #results["covariances"][tetramer] = cov_df
823 results["covariances"][ic_tetramer] = cov_df
824 #results["constants"][tetramer] = cte
825 results["constants"][ic_tetramer] = cte
826 # build stiffness table
827 columns = [sequence.unit_name] + coordinates + ["product", "sum"]
828 results["stiffness"] = pd.DataFrame.from_dict(
829 diagonals,
830 orient="index").reset_index()
831 results["stiffness"].columns = columns
832 return results
834 def get_subunit_stiffness(
835 self,
836 cols_dict,
837 coordinates,
838 scaling=[1, 1, 1, 10.6, 10.6, 10.6],
839 KT=0.592186827):
840 if (self.unit_len % 2) == 0:
841 SH_av = cols_dict["shift"].mean()
842 SL_av = cols_dict["slide"].mean()
843 RS_av = cols_dict["rise"].mean()
844 TL_av = self.circ_avg(cols_dict["tilt"])
845 RL_av = self.circ_avg(cols_dict["roll"])
846 TW_av = self.circ_avg(cols_dict["twist"])
847 elif (self.unit_len % 2) == 1:
848 SH_av = cols_dict["shear"].mean()
849 SL_av = cols_dict["stretch"].mean()
850 RS_av = cols_dict["stagger"].mean()
851 CW_av = cols_dict["chiw"].mean()
852 CC_av = cols_dict["chic"].mean()
853 TL_av = self.circ_avg(cols_dict["buckle"])
854 RL_av = self.circ_avg(cols_dict["propel"])
855 TW_av = self.circ_avg(cols_dict["opening"])
856 cols_arr = [cols_dict[coord] for coord in coordinates]
857 cols_arr = np.array(cols_arr).T
859 cv = ma.cov(ma.masked_invalid(cols_arr), rowvar=False)
860 cv.filled(np.nan)
862 cov_df = pd.DataFrame(cv, columns=coordinates, index=coordinates)
863 stiff = np.linalg.inv(cv) * KT
864 # Added two new variables: ChiC and ChiW -> 8 (for PENTAMERS)
865 if (self.unit_len % 2) == 0:
866 last_row = [SH_av, SL_av, RS_av, TL_av, RL_av, TW_av] #, CW_av, CC_av]
867 stiff = np.append(stiff, last_row).reshape(7, 6)
868 elif (self.unit_len % 2) == 1:
869 last_row = [SH_av, SL_av, RS_av, TL_av, RL_av, TW_av, CW_av, CC_av]
870 stiff = np.append(stiff, last_row).reshape(9, 8)
871 scaling=[1, 1, 1, 10.6, 10.6, 10.6, 1, 1]
873 stiff = stiff.round(6)
874 stiff_diag = np.diagonal(stiff) * np.array(scaling)
876 cte = pd.DataFrame(stiff)
877 cte.columns = coordinates
878 cte.index = coordinates + ["avg"]
879 return stiff_diag, cte, cov_df
883 @ staticmethod
884 def circ_avg(xarr, degrees=True):
885 n = len(xarr)
886 if degrees:
887 # convert to radians
888 xarr = xarr * np.pi / 180
889 av = np.arctan2(
890 (np.sum(np.sin(xarr)))/n,
891 (np.sum(np.cos(xarr)))/n) * 180 / np.pi
892 return av
894 def unimod_labels(self, df):
895 # get limiting values for col1 and col2
896 global_mean = df.mean(axis=0)
897 global_std = df.std(axis=0)
898 l1 = global_mean + global_std
899 l2 = global_mean - global_std
901 # 1: above mean+std (blue)
902 # 0: between mean-std and mean+std (white)
903 # -1: below mean-std (red)
904 labeled_df = (df < l2) * -1 + (df > l1)
905 labeled_df.loc["g_mean"] = global_mean
906 labeled_df.loc["g_std"] = global_std
908 return labeled_df
910 def make_tables(self, dataset):
911 # stiffness
912 stiffness_data = dataset["stiffness"]
913 stiffness_data.to_csv(self.save_path / "stiffness.csv")
914 # covariances
915 covariances_path = self.save_path / "covariances"
916 covariances_path.mkdir(exist_ok=True)
917 for key, val in dataset["covariances"].items():
918 val.to_csv(covariances_path / f"{key}.csv")
919 # constants
920 constants_path = self.save_path / "constants"
921 constants_path.mkdir(exist_ok=True)
922 for key, val in dataset["constants"].items():
923 val.to_csv(constants_path / f"{key}.csv")
925 def make_plots(self, dataset):
926 stiffness_data = dataset["stiffness"]
927 labeled_df = self.unimod_labels(stiffness_data)
928 for col in labeled_df.columns:
929 df = labeled_df[col]
930 g_mean = df.loc["g_mean"]
931 g_std = df.loc["g_std"]
932 df = df.iloc[:-2]
933 df = df.rename("col1")
934 df = df.reset_index()
935 df["col2"] = df["col1"].copy()
936 arlequin_plot(
937 df,
938 g_mean,
939 g_std,
940 col,
941 self.save_path,
942 base=self.Ac,
943 unit_name=self.unit_name,
944 unit_len=self.unit_len)
946# Run the NASSA analysis pipeline giving the name of the analysis and the configuration file.
947# The overwrite flag could be used to overwrite the output folder if it already exists.
948def run_nassa(analysis_name: str,
949 config_archive: dict,
950 overwrite_nassa: bool = False):
951 """Run the NASSA analysis pipeline"""
953 # Dictionary with the available NASSA analyses and their corresponding classes
954 analyses = {
955 "bpcorr": BasePairCorrelation,
956 "bconf": BConformations,
957 "crdcorr": CoordinateCorrelation,
958 "coordist": CoordinateDistributions,
959 "stiff": StiffnessDistributions
960 }
962 # We must check if the analysis name is valid
963 if analysis_name in analyses:
964 # A folder for each analysis has to be created
965 analyses_folder = os.path.join(config_archive["save_path"], analysis_name)
966 if not os.path.exists(analyses_folder):
967 os.mkdir(analyses_folder)
968 # The output folder is added to the configuration archive
969 config_archive["save_path"] = analyses_folder
970 else:
971 # If the output folder already exists, it is checked so that it can be overwritten with te overwrite_nassa flag
972 if os.listdir(analyses_folder) == [] or overwrite_nassa:
973 config_archive["save_path"] = analyses_folder
974 print(f'WARNING: Output folder {analyses_folder} already exists. Overwriting it.')
975 else:
976 print(f'WARNING: Output folder {analyses_folder} already exists and is not empty. Skipping NASSA analysis. \nSet the overwrite flag to overwrite the output folder (-own).')
977 return
978 # The analysis is run
979 analysis_class = analyses[analysis_name]
980 # The stiffness analysis is a special case, since the coordinate files are different depending on the unit length (if it is pair or impair)
981 if analysis_name == "stiff":
982 unit_len = config_archive["unit_len"] # Obtain the unit length from the configuration archive
983 if (unit_len % 2) == 0:
984 coordinate_files = NASSA_ANALYSES_CANALS["bpcorr"]
985 elif (unit_len % 2) == 1:
986 coordinate_files = NASSA_ANALYSES_CANALS["stiff"]
987 # The rest of the analyses have their corresponding coordinate files
988 else:
989 coordinate_files = NASSA_ANALYSES_CANALS[analysis_name]
990 # The coordinate files are filtered from the configuration archive because we only need the ones that correspond to the analysis
991 config_archive["coordinate_info"] = {
992 coord: config_archive["coordinate_info"][coord] for coord in coordinate_files}
993 # Call the analysis class with the configuration archive and run NASSA software
994 analysis_instance = analysis_class(**config_archive)
995 analysis_instance.run()
996 # If the analysis name is not valid, an error is raised
997 else:
998 raise ValueError(
999 f"{analysis_name} is not a valid analysis! "
1000 "Please choose from: "
1001 f"{list(analyses.keys())}")
1004def workflow_nassa(
1005 config_file_path: Optional[str],
1006 analysis_names: Optional[List[str]],
1007 make_config: bool = False,
1008 output: Optional[List[str]] = None,
1009 working_directory: str = '.',
1010 overwrite: bool = False,
1011 overwrite_nassa: bool = False,
1012 helical_par: bool = False,
1013 proj_dirs: Optional[List[str]] = None,
1014 input_structure_file: Optional[str] = None,
1015 input_trajectory_file: Optional[str] = None,
1016 input_top_file: Optional[str] = None,
1017 all: bool = False,
1018 unit_len: int = 6,
1019 n_sequences: Optional[int] = '*',
1020 seq_path: str = None,
1021 md_directories: Optional[List[str]] = None,
1022 trust: bool = False,
1023 mercy: bool = False,
1024 ):
1026 # Change to the working directory and print the name of the directory to inform the user
1027 chdir(working_directory)
1028 current_directory_name = getcwd().split('/')[-1]
1029 print(f'\n{BLUE_HEADER}Running NASSA at {current_directory_name}{COLOR_END}')
1030 # If the user select the helical parameter analysis, we need to call the function from workflow to run it
1031 if helical_par:
1032 # AGUS: This option with glob is commented because it is not working properly so if in the future it is needed, it should be fixed
1033 # AGUS: the objective to use glob is to run this part (of project directories) calling this function from python and not from the command line
1034 #seq_paths = glob.glob(f'{proj_dirs}')
1035 #seq_paths = [path for path in glob.glob(f'{proj_dirs}*') if os.path.isdir(path)]
1037 actual_path = getcwd()
1038 # The workflow function is imported from model_workflow to run the helical parameters analysis and do all the checks in each project directory
1039 # By this way, we are assuming that each project directory is an independent project or MD simulation
1040 from model_workflow.mwf import workflow
1041 # Iterate over the project directories
1042 for proj_path in proj_dirs:
1043 # Obtain the path of the project directory and change to it
1044 md_path = os.path.join(actual_path, proj_path)
1045 os.chdir(md_path)
1046 print(f'\n{CYAN_HEADER}Running Helical Parameters at {proj_path}{COLOR_END}')
1047 # Call the workflow function to run the helical parameters analysis with the include flag set to helical and overwrite (if it is added)
1048 workflow(project_parameters={#'directory':os.getcwd(),
1049 #'md_directories':[os.getcwd()],
1050 #'input_structure_filepath':input_structure_file,
1051 'input_topology_filepath':input_top_file,
1052 'input_trajectory_filepaths':input_trajectory_file,
1053 'md_directories': md_directories,
1054 'trust': trust,
1055 'mercy': mercy
1056 },
1057 include=['helical'],
1058 overwrite=overwrite
1059 )
1060 os.chdir('..')
1061 # If all flag is selected, the NASSA analysis will be run after the helical parameters analysis
1062 # Reminder: the NASSA analysis needs the helical parameters files to be generated before running it.
1063 if all:
1064 # The sequences path is needed to run the NASSA analysis so it has to be defined
1065 if seq_path == None:
1066 raise InputError('No sequence path defined. Please define it as an argument (--seq_path)')
1067 output_nassa_analysis = os.path.join(actual_path, 'nassa_analysis')
1068 # If the output folder already exists, it is checked so that it can be overwritten with te overwrite_nassa flag
1069 if not os.path.exists(output_nassa_analysis):
1070 os.mkdir(output_nassa_analysis)
1071 else:
1072 if overwrite_nassa == True:
1073 print(f'WARNING: Output folder {output_nassa_analysis} already exists. Overwriting it.')
1074 else:
1075 print(f'WARNING: Output folder {output_nassa_analysis} already exists and is not empty. Skipping NASSA analysis. \nSet the overwrite flag to overwrite the output folder (--overwrite_nassa).')
1076 return
1077 # Generate the NASSA configuration file
1078 generate_nassa_config(
1079 folder_path=proj_dirs,
1080 seq_path=seq_path,
1081 output_path=output_nassa_analysis,
1082 unit_len=unit_len,
1083 n_sequences=n_sequences,
1084 )
1085 print(f'NASSA configuration file generated at {output_nassa_analysis}')
1086 config_file_path = os.path.join(output_nassa_analysis, 'nassa.yml')
1087 # The user can select the analysis to run
1088 if analysis_names is not None:
1089 for analysis_name in analysis_names:
1090 print(f' Running analysis {analysis_name}')
1091 # Read the configuration file
1092 try:
1093 with Path(config_file_path).open("r") as ymlfile:
1094 config_archive = yaml.load(
1095 ymlfile, Loader=yaml.FullLoader)
1096 except FileNotFoundError:
1097 raise FileNotFoundError(
1098 f"Configuration file {config_file_path} not found!")
1099 # Run the NASSA analysis with the selected analysis
1100 run_nassa(analysis_name=analysis_name,
1101 config_archive=config_archive,
1102 overwrite_nassa=overwrite_nassa)
1103 print(f'NASSA analysis completed at {output_nassa_analysis}')
1104 return
1105 # If the user does not select any analysis, all the analyses will be run
1106 for analysis_name in NASSA_ANALYSES_CANALS.keys():
1107 print(f' Running analysis {analysis_name}')
1108 # Read the configuration file
1109 try:
1110 with Path(config_file_path).open("r") as ymlfile:
1111 config_archive = yaml.load(
1112 ymlfile, Loader=yaml.FullLoader)
1113 except FileNotFoundError:
1114 raise FileNotFoundError(
1115 f"Configuration file {config_file_path} not found!")
1116 # Run the NASSA analysis with the selected analysis
1117 run_nassa(analysis_name=analysis_name,
1118 config_archive=config_archive,
1119 overwrite_nassa=overwrite_nassa)
1120 print(f'NASSA analysis completed at {output_nassa_analysis}')
1121 return
1122 # If the helical parameter analysis is not selected, the NASSA analysis will be run
1123 # To run this, the user has to provide the configuration file with the information needed
1124 if config_file_path and analysis_names is not None:
1125 config_file_path = Path(config_file_path)
1126 print(f' Using config file {config_file_path}')
1127 # Load the configuration file
1128 try:
1129 with Path(config_file_path).open("r") as ymlfile:
1130 config_archive = yaml.load(
1131 ymlfile, Loader=yaml.FullLoader)
1132 except FileNotFoundError:
1133 raise FileNotFoundError(
1134 f"Configuration file {config_file_path} not found!")
1135 # The user can select the analysis to run
1136 for analysis_name in analysis_names:
1137 # THe output folder is defined in the configuration file
1138 if output is not None:
1139 output_path = os.path.join(output, analysis_name)
1140 config_archive['save_path'] = output_path
1141 else:
1142 if 'save_path' in config_archive:
1143 output_path = os.path.join(config_archive['save_path'], analysis_name)
1144 config_archive['save_path'] = output_path
1145 else:
1146 raise InputError('No output path defined. Please define it in the configuration file or pass it as an argument (--output)')
1148 # To check if the user has created the configuration file with the needed information
1149 # it's important to check if the needed files are defined for the analysis selected
1150 canal_files_config = config_archive['coordinate_info']
1151 needed_files = NASSA_ANALYSES_CANALS[analysis_name]
1152 for file in needed_files:
1153 # If some of the files needed are not in the config file, raise an error because it is needed
1154 if file not in canal_files_config:
1155 raise InputError(f'Analysis {analysis_name} requires the files of {file} coordinate to be defined in the configuration file')
1157 # The check for the output folder is done here
1158 if os.path.exists(output_path):
1159 if os.listdir(output_path) == [] or overwrite_nassa == True:
1160 print(f' Output folder {output_path} already exists. Overwriting analysis {analysis_name}')
1161 run_nassa(analysis_name, config_archive, overwrite_nassa)
1162 else:
1163 print(f' Output folder {output_path} already exists. Skipping analysis. \nSet the overwrite flag to overwrite the output folder (--overwrite_nassa).')
1164 continue
1165 else:
1166 os.mkdir(output_path)
1167 print(f' Running analysis {analysis_name} and saving results in {output_path}')
1168 run_nassa(analysis_name, config_archive, overwrite_nassa)
1169 if all:
1170 for analysis_name in NASSA_ANALYSES_CANALS.keys():
1171 print(f"{GREEN_HEADER} |-----> Running analysis {analysis_name}{COLOR_END}")
1172 # Read the configuration file
1173 try:
1174 with Path(config_file_path).open("r") as ymlfile:
1175 config_archive = yaml.load(
1176 ymlfile, Loader=yaml.FullLoader)
1177 except FileNotFoundError:
1178 raise FileNotFoundError(
1179 f"Configuration file {config_file_path} not found!")
1180 # Check if the NASSA output folder exists
1181 output_path = os.path.join(config_archive['save_path'], 'nassa_analysis')
1182 config_archive['save_path'] = output_path
1183 if not os.path.exists(output_path):
1184 os.makedirs(output_path)
1185 # Check if the analysis output folder exists in NASSA output folder
1186 if not os.path.exists(os.path.join(output_path, analysis_name)):
1187 os.makedirs(os.path.join(output_path, analysis_name))
1188 # config_archive['save_path'] = os.path.join(output_path, analysis_name)
1189 # Run the NASSA analysis with the selected analysis
1190 run_nassa(analysis_name=analysis_name,
1191 config_archive=config_archive,
1192 overwrite_nassa=overwrite_nassa)
1193 print(f'NASSA analysis completed at {current_directory_name}')