Coverage for model_workflow/utils/nassa_file.py: 61%
77 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
1import os
2from os import chdir, getcwd
3import glob
4import json
5from pathlib import Path
6from typing import List
7from model_workflow.utils.constants import NASSA_ANALYSES_CANALS
9def generate_nassa_config (
10 folder_path: List[str],
11 seq_path: str,
12 output_path: str,
13 unit_len: int,
14 n_sequences: int,
15 ):
16 nassa_config = {
17 'unit_name': 'hexamer',
18 'unit_len': 6,
19 'n_lines': 5000,
20 'tail': True,
21 'bimod': True,
22 'save_tables': True,
23 'save_plots': True,
24 'save_path': None,
25 'sequence_files': [],
26 'coordinate_info': {}
27 }
28 # If the sequence path is given, we will use it as the base path to search the sequences
29 if seq_path:
30 # As the base path of sequences is given, we will search for the sequences in the given path and create a list of them
31 seq_path = os.path.abspath(seq_path)
32 for sequence_file in os.listdir(seq_path):
33 # We assume that the sequences are in fasta format
34 # AGUS: I think we should allow for other formats as: .seq, .txt
35 if sequence_file.endswith('.fasta') or sequence_file.endswith('.fa'):
36 nassa_config['sequence_files'].append(str(os.path.join(seq_path,sequence_file)))
37 if n_sequences and len(nassa_config['sequence_files']) == n_sequences:
38 break
39 # If the output path is given, we will use it as the save path
40 if output_path:
41 output_nassa_path = os.path.abspath(output_path)
42 nassa_config['save_path'] = output_nassa_path
43 # If the output path is not given, we will use the current path as the save path
44 else:
45 nassa_config['save_path'] = os.path.abspath('.')
46 # If the unit length is given, we will use it as the unit length
47 if unit_len:
48 nassa_config['unit_len'] = unit_len
49 # We will create the configuration file asuming that the coordinates are in the same path as the configuration file (helical_parameters folder)
50 # The path given as argument -m must be the base path of the helical_parameters folder or the folder where the coordinates are
51 # AGUS: habría que explorar más casos, no sé si será siempre así
52 #actual_path = os.path.abspath(folder_path)
53 #print('actual_path: ', actual_path)
54 actual_path = getcwd()
56 for path in folder_path:
57 md_path = os.path.join(actual_path, path)
58 if os.path.exists(os.path.join(md_path, 'helical')):
59 # If canals + curves have previously been calculated, we will use these outputs
60 # We will create a list of the different .ser archives that we have interest in according to the values of NASSA_ANALYSES_CANALS
61 coordinates = []
62 [coordinates.extend(value) for value in NASSA_ANALYSES_CANALS.values()]
63 coordinates = list(set(coordinates))
64 for seq_file in os.listdir(os.path.join(md_path, 'helical')):
65 if seq_file.endswith('.ser'):
66 seq_file_coordinate = seq_file.split('_')[2].replace('.ser', '')
67 # Filter the archives with the correct coordinate
68 if seq_file_coordinate in coordinates:
69 if seq_file_coordinate not in nassa_config["coordinate_info"]:
70 nassa_config["coordinate_info"][seq_file_coordinate] = []
71 nassa_config["coordinate_info"][seq_file_coordinate].append(os.path.join(md_path, 'helical', seq_file))
72 if n_sequences:
73 if len(nassa_config['coordinate_info'][seq_file_coordinate]) == n_sequences:
74 continue
75 else:
76 # If the helical folder does not exist, we will search for the sequences in the given path
77 # In this case, the sequences are in different folders, each folder is a coordinate
78 # AGUS: pueden existir estos archivos en diferentes carpetas, por lo que habría que buscar en todas las carpetas ¿?
79 folders = [f for f in os.listdir(actual_path) if os.path.isdir(os.path.join(actual_path, f))]
80 all_coordinates = []
81 for coordinate in NASSA_ANALYSES_CANALS.values():
82 all_coordinates.extend(coordinate)
83 all_coordinates = list(set(all_coordinates))
84 for folder in folders:
85 for coordinate in all_coordinates:
86 if coordinate == folder:
87 nassa_config["coordinate_info"][coordinate] = []
88 for seq_file in os.listdir(os.path.join(actual_path, folder)):
89 if seq_file.endswith('.ser'):
90 nassa_config["coordinate_info"][coordinate].append(os.path.join(actual_path, folder, seq_file))
91 if n_sequences:
92 if len(nassa_config['coordinate_info'][coordinate]) == n_sequences:
93 break
95 # Sometimes, the number of .ser archives could be less than the whole sequence files, so we will filter the sequence files that have a .ser archive and sort them
96 num_archives = None
97 for coordinate, archives in nassa_config['coordinate_info'].items():
98 count = len(archives)
99 if num_archives is None:
100 num_archives = count
101 # If the number of archives is not the same, we will raise an error and print the number of archives for each coordinate
102 elif count != num_archives:
103 for coordinate, archives in nassa_config['coordinate_info'].items():
104 count = len(archives)
105 print(f"Number of coordinate archives in {coordinate}: {count}")
106 raise ValueError("Not all coordinate archives have the same number")
107 # If the number of archives for each coordinate is the same, we will sort and select the sequence files
108 if len(nassa_config['sequence_files']) != num_archives:
109 nassa_config['sequence_files'].sort()
110 nassa_config['sequence_files'] = nassa_config['sequence_files'][:num_archives]
112 # At this point is strange but If something wrong could happen, we will raise an error
113 if len(nassa_config['sequence_files']) != num_archives:
114 print(f"Number of sequence files: {len(nassa_config['sequence_files'])}")
115 print(f"Number of coordinate archives: {num_archives}")
116 raise ValueError("The number of sequence files is not the same as the number of coordinate archives")
117 # Save nassa_config as a JSON file
118 if output_path:
119 save_path = os.path.join(output_path, 'nassa.yml')
120 else:
121 save_path = os.path.join(os.path.abspath('.'), 'nassa.yml')
123 with open(save_path, 'w') as f:
124 json.dump(nassa_config, f)
126 return save_path