Coverage for model_workflow/utils/nassa_file.py: 61%

77 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 10:54 +0000

1import os 

2from os import chdir, getcwd 

3import glob 

4import json 

5from pathlib import Path 

6from typing import List 

7from model_workflow.utils.constants import NASSA_ANALYSES_CANALS 

8 

9def generate_nassa_config ( 

10 folder_path: List[str], 

11 seq_path: str, 

12 output_path: str, 

13 unit_len: int, 

14 n_sequences: int, 

15 ): 

16 nassa_config = { 

17 'unit_name': 'hexamer', 

18 'unit_len': 6, 

19 'n_lines': 5000, 

20 'tail': True, 

21 'bimod': True, 

22 'save_tables': True, 

23 'save_plots': True, 

24 'save_path': None, 

25 'sequence_files': [], 

26 'coordinate_info': {} 

27 } 

28 # If the sequence path is given, we will use it as the base path to search the sequences 

29 if seq_path: 

30 # As the base path of sequences is given, we will search for the sequences in the given path and create a list of them 

31 seq_path = os.path.abspath(seq_path) 

32 for sequence_file in os.listdir(seq_path): 

33 # We assume that the sequences are in fasta format  

34 # AGUS: I think we should allow for other formats as: .seq, .txt 

35 if sequence_file.endswith('.fasta') or sequence_file.endswith('.fa'): 

36 nassa_config['sequence_files'].append(str(os.path.join(seq_path,sequence_file))) 

37 if n_sequences and len(nassa_config['sequence_files']) == n_sequences: 

38 break 

39 # If the output path is given, we will use it as the save path 

40 if output_path: 

41 output_nassa_path = os.path.abspath(output_path) 

42 nassa_config['save_path'] = output_nassa_path 

43 # If the output path is not given, we will use the current path as the save path 

44 else: 

45 nassa_config['save_path'] = os.path.abspath('.') 

46 # If the unit length is given, we will use it as the unit length 

47 if unit_len: 

48 nassa_config['unit_len'] = unit_len 

49 # We will create the configuration file asuming that the coordinates are in the same path as the configuration file (helical_parameters folder) 

50 # The path given as argument -m must be the base path of the helical_parameters folder or the folder where the coordinates are 

51 # AGUS: habría que explorar más casos, no sé si será siempre así 

52 #actual_path = os.path.abspath(folder_path) 

53 #print('actual_path: ', actual_path) 

54 actual_path = getcwd() 

55 

56 for path in folder_path: 

57 md_path = os.path.join(actual_path, path) 

58 if os.path.exists(os.path.join(md_path, 'helical')): 

59 # If canals + curves have previously been calculated, we will use these outputs 

60 # We will create a list of the different .ser archives that we have interest in according to the values of NASSA_ANALYSES_CANALS 

61 coordinates = [] 

62 [coordinates.extend(value) for value in NASSA_ANALYSES_CANALS.values()] 

63 coordinates = list(set(coordinates)) 

64 for seq_file in os.listdir(os.path.join(md_path, 'helical')): 

65 if seq_file.endswith('.ser'): 

66 seq_file_coordinate = seq_file.split('_')[2].replace('.ser', '') 

67 # Filter the archives with the correct coordinate 

68 if seq_file_coordinate in coordinates: 

69 if seq_file_coordinate not in nassa_config["coordinate_info"]: 

70 nassa_config["coordinate_info"][seq_file_coordinate] = [] 

71 nassa_config["coordinate_info"][seq_file_coordinate].append(os.path.join(md_path, 'helical', seq_file)) 

72 if n_sequences: 

73 if len(nassa_config['coordinate_info'][seq_file_coordinate]) == n_sequences: 

74 continue 

75 else: 

76 # If the helical folder does not exist, we will search for the sequences in the given path 

77 # In this case, the sequences are in different folders, each folder is a coordinate 

78 # AGUS: pueden existir estos archivos en diferentes carpetas, por lo que habría que buscar en todas las carpetas ¿? 

79 folders = [f for f in os.listdir(actual_path) if os.path.isdir(os.path.join(actual_path, f))] 

80 all_coordinates = [] 

81 for coordinate in NASSA_ANALYSES_CANALS.values(): 

82 all_coordinates.extend(coordinate) 

83 all_coordinates = list(set(all_coordinates)) 

84 for folder in folders: 

85 for coordinate in all_coordinates: 

86 if coordinate == folder: 

87 nassa_config["coordinate_info"][coordinate] = [] 

88 for seq_file in os.listdir(os.path.join(actual_path, folder)): 

89 if seq_file.endswith('.ser'): 

90 nassa_config["coordinate_info"][coordinate].append(os.path.join(actual_path, folder, seq_file)) 

91 if n_sequences: 

92 if len(nassa_config['coordinate_info'][coordinate]) == n_sequences: 

93 break 

94 

95 # Sometimes, the number of .ser archives could be less than the whole sequence files, so we will filter the sequence files that have a .ser archive and sort them 

96 num_archives = None 

97 for coordinate, archives in nassa_config['coordinate_info'].items(): 

98 count = len(archives) 

99 if num_archives is None: 

100 num_archives = count 

101 # If the number of archives is not the same, we will raise an error and print the number of archives for each coordinate 

102 elif count != num_archives: 

103 for coordinate, archives in nassa_config['coordinate_info'].items(): 

104 count = len(archives) 

105 print(f"Number of coordinate archives in {coordinate}: {count}") 

106 raise ValueError("Not all coordinate archives have the same number") 

107 # If the number of archives for each coordinate is the same, we will sort and select the sequence files 

108 if len(nassa_config['sequence_files']) != num_archives: 

109 nassa_config['sequence_files'].sort() 

110 nassa_config['sequence_files'] = nassa_config['sequence_files'][:num_archives] 

111 

112 # At this point is strange but If something wrong could happen, we will raise an error  

113 if len(nassa_config['sequence_files']) != num_archives: 

114 print(f"Number of sequence files: {len(nassa_config['sequence_files'])}") 

115 print(f"Number of coordinate archives: {num_archives}") 

116 raise ValueError("The number of sequence files is not the same as the number of coordinate archives") 

117 # Save nassa_config as a JSON file 

118 if output_path: 

119 save_path = os.path.join(output_path, 'nassa.yml') 

120 else: 

121 save_path = os.path.join(os.path.abspath('.'), 'nassa.yml') 

122 

123 with open(save_path, 'w') as f: 

124 json.dump(nassa_config, f) 

125 

126 return save_path