Coverage for model_workflow/tools/nassa_loaders.py: 60%

62 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 10:54 +0000

1import pathlib 

2from collections import deque 

3from model_workflow.utils.nucleicacid import NucleicAcid 

4import pandas as pd 

5 

6def load_sequence(seqfile, unit_len, unit_name=None): 

7 """ 

8 Load single text file containing forward and inverse-complementary sequences. 

9 

10 :param str seq_file: a string with the path to the sequences file. 

11 :param str unit_name: name of subunit to be analyzed. 

12 :param int unit_len: length of subunit to be analyzed. 

13 :returns: NASequence object for loaded sequence. 

14 """ 

15 assert isinstance(seqfile, str) or isinstance(seqfile, pathlib.Path) 

16 sequences = pathlib.Path(seqfile).read_text().split() 

17 if len(sequences) < 2: 

18 sequences.append(None) 

19 try: 

20 assert len(sequences) == 2 

21 except AssertionError: 

22 raise AssertionError("Error in sequence file! Check its not empty") 

23 nucleic_acid = NucleicAcid( 

24 sequence=sequences[0], 

25 ic_sequence=sequences[1], 

26 unit_name=unit_name, 

27 unit_len=unit_len) 

28 return nucleic_acid 

29 

30# This function is a copy of the previous one, but it does not read from a file. 

31# It is used to load a sequence from a string. It is used in the helical parameters analysis. 

32def load_sequence2(sequence, unit_len, unit_name='hexamer'): 

33 """ 

34 Load single text file containing forward and inverse-complementary sequences. 

35 

36 :param str seq_file: a string with the path to the sequences file. 

37 :param str unit_name: name of subunit to be analyzed. 

38 :param int unit_len: length of subunit to be analyzed. 

39 :returns: NASequence object for loaded sequence. 

40 """ 

41 #assert isinstance(seqfile, str) or isinstance(seqfile, pathlib.Path) 

42 #sequences = pathlib.Path(seqfile).read_text().split() 

43 sequences = [sequence] 

44 if len(sequences) == 1: 

45 sequences.append(reverse_sequence(sequence)) 

46 if len(sequences) < 2: 

47 sequences.append(None) 

48 try: 

49 assert len(sequences) == 2 

50 except AssertionError: 

51 raise AssertionError("Error in sequence file! Check its not empty") 

52 nucleic_acid = NucleicAcid( 

53 sequence=sequences[0], 

54 ic_sequence=sequences[1], 

55 unit_name=unit_name, 

56 unit_len=unit_len) 

57 return nucleic_acid 

58# This function is used to reverse the sequence and obtain the inverse complement. 

59def reverse_sequence(sequence,DNA=True): 

60 if DNA: # If DNA flag is tru we want to compute the inverse using T instead of U 

61 A_base = "T" 

62 else: # Now it is RNA so we want to convert T to U 

63 A_base = "U" 

64 inverse = {"A":A_base,"G":"C","C":"G",A_base:"A"} # Dictionary to convert easily the sequence  

65 inv_seq = "" 

66 for i in sequence[::-1]: # Traverse the sequence from the end to the beginning 

67 inv_seq += inverse[i] # Obtain the complementary base  

68 return inv_seq # Return the inverse complement 

69 

70def write_sequence(nucleic_acid, filename): 

71 assert isinstance(nucleic_acid, NucleicAcid) 

72 output = f"{nucleic_acid.sequence}\n{nucleic_acid.ic_sequence}" 

73 pathlib.Path(filename).write_text(output) 

74 

75 

76 

77def load_serfile(ser_file, tail=True, n_lines=None): 

78 """ 

79 Load single file containing a coordinate's series. 

80 

81 :param str ser_file: path to .ser file. 

82 :param bool tail: (Default True) read the last ``n_lines`` of the file. Otherwise, read the first ``n_lines``. 

83 :param int n_lines: number of rows to read. 

84 :returns pandas.DataFrame: .ser file converted into a pandas.DataFrame table 

85 """ 

86 if tail: 

87 #with open(ser_file, "r") as f: 

88 # read last line of file, get index number for that line 

89 # total_lines = int(deque(f, 1)[0].split()[0]) 

90 #extra_kwargs = dict(skiprows=total_lines - n_lines) 

91 with open(ser_file, "r") as f: 

92 total_lines_str = deque(f, 1)[0].split()[0] 

93 total_lines = int(total_lines_str) if total_lines_str.isdigit() else None 

94 

95 if total_lines is not None and n_lines is not None: 

96 extra_kwargs = dict(skiprows=max(0, total_lines - n_lines)) 

97 else: 

98 extra_kwargs = dict() 

99 else: 

100 extra_kwargs = dict(nrows=n_lines) 

101 ser_data = pd.read_csv( 

102 ser_file, 

103 header=None, 

104 sep='\s+', 

105 index_col=0, 

106 **extra_kwargs) 

107 return ser_data 

108 

109def write_serfile(data, filename, indent=8, decimals=2, transpose=True): 

110 """Write data to same format as .ser file. 

111 By default, data is asumed to be in shape (n_cols, n_frames), and it's written in 8-spaced columns with values rounded to two decimals. 

112 

113 :param numpy.ndarray data: output data 

114 :param str filename: dataset's filename 

115 :param indent: width of columns, defaults to 8 

116 :type indent: int, optional 

117 :param decimals: number of rounding decimals, defaults to 2 

118 :type decimals: int, optional 

119 :param transpose: transpose data array before writing. It should be used so array shape is (n_frames, n_cols). Defaults to True 

120 :type transpose: bool, optional 

121 """ 

122 if transpose: 

123 data = data.T 

124 with open(filename, "w") as f: 

125 for row in data: 

126 s = f"{int(row[0]):>indent}" 

127 for elem in row[1:]: 

128 elem = round(elem, decimals) 

129 s += f"{elem:>indent}" 

130 s += "\n" 

131 f.write(s)