Coverage for model_workflow/utils/filters.py: 60%

85 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 10:54 +0000

1from model_workflow.utils.auxiliar import InputError 

2from model_workflow.utils.formats import get_format_set_suitable_function 

3from model_workflow.utils.gmx_spells import filter_structure, filter_trajectory, filter_tpr 

4from model_workflow.utils.pyt_spells import filter_topology 

5from model_workflow.utils.structures import Structure 

6from model_workflow.utils.type_hints import * 

7from model_workflow.utils.file import File 

8from model_workflow.tools.conversions import convert 

9from inspect import getfullargspec 

10 

11# Set functions to performe structure conversions 

12# These functions must have 'input_structure_file' and 'output_structure_file' keywords 

13# These functions must have the 'format_sets' property 

14# These functions may have the 'input_trajectory_file' keyword 

15structure_filtering_functions = [ filter_structure, filter_tpr, filter_topology ] 

16 

17# Set functions to performe trajectory conversions 

18# These functions must have 'input_trajectory_file' and 'output_trajectory_file' keywords 

19# These functions must have the 'format_sets' property 

20trajectory_filtering_functions = [ filter_trajectory ] 

21 

22# Auxiliar PDB file used to instatiate the reference structure sometimes 

23AUXILIAR_PDB_FILE = File('.auxiliar.pdb') 

24 

25# Handle filterings of different structure and trajectory formats 

26def filter_atoms ( 

27 input_structure_file : Optional['File'] = None, 

28 output_structure_file : Optional['File'] = None, 

29 input_trajectory_file : Optional['File'] = None, 

30 output_trajectory_file : Optional['File'] = None, 

31 selection_string : Optional[str] = None, 

32 selection_syntax : str = 'vmd' 

33): 

34 

35 # If we have no output at all then there is nothing to do 

36 if not output_structure_file and not output_trajectory_file: 

37 raise InputError('No output structure neither output trajectory was specified') 

38 # If we have output but not input we must complain 

39 if output_structure_file and not input_structure_file: 

40 raise InputError('Missing input structure when output structure is requested') 

41 if output_trajectory_file and not input_trajectory_file: 

42 raise InputError('Missing input trajectory when output trajectory is requested') 

43 

44 # Check input files to exist 

45 for input_file in [ input_structure_file, input_trajectory_file ]: 

46 if input_file and not input_file.exists: 

47 raise InputError('Could not find input file ' + input_file.path) 

48 

49 # Check we are not missing additional inputs 

50 if not selection_string: 

51 print('Missing input selection string -> Water and counter ions will be filtered') 

52 elif not selection_syntax: 

53 raise InputError('Missing input selection syntax') 

54 

55 # We need a PDB to instantiate the reference structure 

56 # The structure file may also be instantiated from some topology file formats but this is not much reliable 

57 # If the input structure is not a PDB then we make a conversion to generate it 

58 structure_filepath = input_structure_file.path 

59 if input_structure_file.format != 'pdb': 

60 # To do so we need coordinates so make sure they were passed 

61 if not input_trajectory_file: 

62 raise InputError('In order to filter we need coordinates since some atom selections may rely in close atoms.\n' + 

63 ' Your input structure is a topology thus not having coordinates.\n' + 

64 ' Please provide a trajectory file as well') 

65 # Generate a PDB file using both input topology and trajectory 

66 structure_filepath = AUXILIAR_PDB_FILE.path 

67 convert(input_structure_filepath=input_structure_file.path, 

68 output_structure_filepath=structure_filepath, 

69 input_trajectory_filepaths=input_trajectory_file.path) 

70 

71 # Parse the selection 

72 selection = None 

73 # Hope the input structure is in a supported format 

74 structure = Structure.from_file(structure_filepath) 

75 if selection_string: 

76 # Parse the input selection 

77 selection = structure.select(selection_string, syntax=selection_syntax) 

78 # If the selection is empty then war the user 

79 if not selection: 

80 raise InputError(f'Selection {selection_string} is empty') 

81 else: 

82 # If the selection is missing then filter out water and ions by default 

83 water_selection = structure.select_water() 

84 counter_ions_selection = structure.select_counter_ions() 

85 filter_selection = water_selection + counter_ions_selection 

86 selection = structure.invert_selection(filter_selection) 

87 # If the selection is empty then war the user 

88 if not selection: 

89 raise InputError('There are no water or counter ions') 

90 

91 # Remove the axuiliar PDB file, if exists 

92 if AUXILIAR_PDB_FILE.exists: AUXILIAR_PDB_FILE.remove() 

93 

94 # Check if any input file has an non-standard extension of a supported format 

95 # If so then we create a symlink with the standard extension 

96 # Save created symlinks to remove them at then of the process 

97 symlink_files = [] 

98 if input_structure_file and input_structure_file.extension != input_structure_file.format: 

99 input_structure_file = input_structure_file.get_standard_file() 

100 symlink_files.append(input_structure_file) 

101 if input_trajectory_file and input_trajectory_file.extension != input_trajectory_file.format: 

102 input_trajectory_file = input_trajectory_file.get_standard_file() 

103 symlink_files.append(input_trajectory_file) 

104 

105 # Filter the structure if an output structure file was provided 

106 if output_structure_file: 

107 print(f'Filtering structure {input_structure_file.path} to {output_structure_file.path}') 

108 # Find a suitable function according to the formats 

109 # Note than given the nature of this logic we may encounter a function which converts the file format 

110 # This function is not intended for that but this is not a problem either 

111 # If the user specifies different input/output formats and the function can do it then go ahead 

112 request_format_set = { 

113 'inputs': { 

114 'input_structure_file': { input_structure_file.format }, 

115 'input_trajectory_file': { input_trajectory_file.format } 

116 }, 

117 'outputs': { 

118 'output_structure_file': { output_structure_file.format } 

119 } 

120 } 

121 suitable = next(get_format_set_suitable_function( 

122 available_functions=structure_filtering_functions, 

123 available_request_format_sets=[request_format_set], 

124 ), None) 

125 # If there is no function to handle this specific conversion we stop here 

126 if not suitable: 

127 if input_structure_file.format == output_structure_file.format: 

128 raise InputError(f'Filtering structure files in {input_structure_file.format} format is not supported') 

129 else: 

130 raise InputError(f'Filtering structure from {input_structure_file.format} to {output_structure_file.format} is not supported') 

131 filtering_function, formats = suitable 

132 # Find the function keywords 

133 # This is important since some functions may need a trajectory input in addition 

134 filtering_function_keywords = getfullargspec(filtering_function)[0] 

135 required_trajectory = 'input_trajectory_filename' in filtering_function_keywords 

136 # If we need a trajectory then pass it as argument as well 

137 if required_trajectory: 

138 # Make sure an input trajectory was passed 

139 if not input_trajectory_file: 

140 raise InputError('The structure input format ' + input_structure_file.format + 

141 ' is missing coordinates and the output format ' + output_structure_file.format + 

142 ' needs them. An input trajectory file is required.') 

143 filtering_function( 

144 input_structure_file=input_structure_file, 

145 input_trajectory_file=input_trajectory_file, 

146 output_structure_file=output_structure_file, 

147 input_selection=selection 

148 ) 

149 # Otherwise use just the structure as input 

150 else: 

151 filtering_function( 

152 input_structure_file=input_structure_file, 

153 output_structure_file=output_structure_file, 

154 input_selection=selection 

155 ) 

156 

157 # Filter the trajectory if an output trajectory file was provided 

158 if output_trajectory_file: 

159 print(f'Filtering trajectory {output_trajectory_file.path} to {output_trajectory_file.path}') 

160 # Otherwise, we must convert 

161 # Choose the right conversion function according to input and output formats 

162 request_format_set = { 

163 'inputs': { 

164 'input_structure_file': { input_structure_file.format }, 

165 'input_trajectory_file': { input_trajectory_file.format } 

166 }, 

167 'outputs': { 

168 'output_trajectory_file': { output_trajectory_file.format } 

169 } 

170 } 

171 suitable = next(get_format_set_suitable_function( 

172 available_functions=trajectory_filtering_functions, 

173 available_request_format_sets=[request_format_set], 

174 ), None) 

175 # If there is no function to handle the filtering then we stop here 

176 if not suitable: 

177 if input_trajectory_file.format == output_trajectory_file.format: 

178 raise InputError(f'Filtering trajectory files in {input_trajectory_file.format} format is not supported') 

179 else: 

180 raise InputError(f'Filtering trajectory from {input_trajectory_file.format} to {output_trajectory_file.format} is not supported') 

181 filtering_function, formats = suitable 

182 # Get the input structure expected format 

183 expected_input_structure_formats = formats['inputs'].get('input_structure_file', False) 

184 # If the function expects an input structure in any format then pass the structure 

185 if expected_input_structure_formats: 

186 filtering_function( 

187 input_structure_file=input_structure_file, 

188 input_trajectory_file=input_trajectory_file, 

189 output_trajectory_file=output_trajectory_file, 

190 input_selection=selection 

191 ) 

192 # If the function expects an input structure as None then pass None 

193 elif expected_input_structure_formats == None: 

194 filtering_function( 

195 input_structure_file=None, 

196 input_trajectory_file=input_trajectory_file, 

197 output_trajectory_file=output_trajectory_file, 

198 input_selection=selection 

199 ) 

200 # If the function does not expect an input structure then do not pass it 

201 else: 

202 filtering_function( 

203 input_trajectory_file=input_trajectory_file, 

204 output_trajectory_file=output_trajectory_file, 

205 input_selection=selection 

206 ) 

207 

208 # Remove generated symlinks, if any 

209 for symlink_file in symlink_files: 

210 symlink_file.remove()