Coverage for model_workflow/utils/mdt_spells.py: 21%

158 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 10:54 +0000

1import os 

2from os.path import exists, getsize 

3from subprocess import run, PIPE 

4from typing import List 

5 

6import mdtraj as mdt 

7import numpy as np 

8 

9from model_workflow.utils.file import File 

10from model_workflow.utils.gmx_spells import merge_xtc_files 

11from model_workflow.utils.constants import GREY_HEADER, COLOR_END 

12 

13CURSOR_UP_ONE = '\x1b[1A' 

14ERASE_LINE = '\x1b[2K' 

15ERASE_PREVIOUS_LINE = CURSOR_UP_ONE + ERASE_LINE + CURSOR_UP_ONE 

16 

17# Set mdtraj supported formats 

18mdtraj_supported_structure_formats = { 

19 'pdb', 'pdb.gz' 'h5', 'lh5', 'prmtop', 'parm7', 'prm7', 'psf', 'mol2', 'hoomdxml', 'gro', 'arc', 'hdf5', 'gsd' 

20} 

21mdtraj_supported_trajectory_formats = {'dcd', 'xtc', 'trr', 'nc', 'h5', 'binpos', 'mdcrd', 'xyz', 'pdb'} 

22 

23# Use mdtraj 'mdconvert' command-line script (there is no python version for this tool apparently) 

24# Multiple files may be selected with bash syntax 

25def merge_and_convert_trajectories ( 

26 input_trajectory_filenames : List[str], 

27 output_trajectory_filename : str 

28 ): 

29 

30 # Run MDtraj 

31 print(GREY_HEADER, end='\r') 

32 process = run([ 

33 "mdconvert", 

34 "-o", 

35 output_trajectory_filename, 

36 *input_trajectory_filenames, 

37 ], stderr=PIPE) 

38 error_logs = process.stderr.decode() 

39 print(COLOR_END, end='\r') 

40 

41 # If output has not been generated then warn the user 

42 if not exists(output_trajectory_filename) or getsize(output_trajectory_filename) == 0: 

43 print(error_logs) 

44 raise SystemExit('Something went wrong with MDTraj') 

45 

46# NEVER FORGET: mdconvert does not handle mdcrd format 

47mdconvert_supported_formats = {'dcd', 'xtc', 'trr', 'nc', 'h5', 'binpos'} 

48merge_and_convert_trajectories.format_sets = [ 

49 { 

50 'inputs': { 

51 'input_trajectory_filenames': mdconvert_supported_formats 

52 }, 

53 'outputs': { 

54 'output_trajectory_filename': mdconvert_supported_formats 

55 } 

56 }, 

57] 

58 

59# Merge and convert trajectories without the mdconvert command 

60# WARNING: This process is slow since we must iterate and merge each frame 

61# WARNING: This process is restricted to trr/xtc output files given the merger 

62def merge_and_convert_trajectories_alternative ( 

63 input_structure_filename : str, 

64 input_trajectory_filenames : List[str], 

65 output_trajectory_filename : str 

66 ): 

67 

68 # Assert we have input values 

69 if not input_structure_filename: 

70 raise SystemExit('ERROR: Missing input structure filenames') 

71 if not input_trajectory_filenames: 

72 raise SystemExit('ERROR: Missing input trajectory filenames') 

73 if not output_trajectory_filename: 

74 raise SystemExit('ERROR: Missing output trajectory filename') 

75 

76 # Load the topology, which is used further 

77 topology = mdt.load_topology(input_structure_filename) 

78 

79 # If the output trajectory filename matches any input file then we must rename the input filename to not overwrite it 

80 if output_trajectory_filename in input_trajectory_filenames: 

81 backup_filename = 'backup.' + output_trajectory_filename 

82 os.rename(output_trajectory_filename, backup_filename) 

83 repeated_input_filename_index = input_trajectory_filenames.index(output_trajectory_filename) 

84 input_trajectory_filenames[repeated_input_filename_index] = backup_filename 

85 

86 # If the output trajectory file already exists at this point then we must stop here 

87 # The raw trjcat implementation will not join things to the end of it 

88 if exists(output_trajectory_filename): 

89 raise SystemExit('The output file already exists and overwrite is not supported for this functionality') 

90 # Print an empty line for the first 'ERASE_PREVIOUS_LINE' to not delete a previous log 

91 print() 

92 # Iterate over the different input trajectory filenames 

93 frame_filename = '.frame.xtc' 

94 for input_trajectory_filename in input_trajectory_filenames: 

95 # Load the trajectory frame by frame 

96 trajectory = mdt.iterload(input_trajectory_filename, top=input_structure_filename, chunk=1) 

97 for f, frame in enumerate(trajectory): 

98 # Update the current frame log 

99 print(ERASE_PREVIOUS_LINE) 

100 print('Frame ' + str(f)) 

101 frame.save(frame_filename) 

102 # Join current frame to the output trajectory 

103 merge_xtc_files(output_trajectory_filename, frame_filename) 

104 # Remove the residual file 

105 # WARNING: It may not exist if the trajectory has 1 frame 

106 if exists(frame_filename): 

107 os.remove(frame_filename) 

108merge_and_convert_trajectories_alternative.format_sets = [ 

109 { 

110 'inputs': { 

111 'input_structure_filename': mdtraj_supported_structure_formats, 

112 'input_trajectory_filenames': mdtraj_supported_trajectory_formats 

113 }, 

114 'outputs': { 

115 'output_trajectory_filename': {'xtc', 'trr'} 

116 } 

117 }, 

118] 

119 

120# Merge and convert trajectories without the mdconvert command 

121# WARNING: Note that this process is not memory efficient so beware the size of trajectories to be converted 

122# DEPRECATED: This function was meant to convert trajectories to mdcrd, which is not supported by mdconvert 

123# DEPRECATED: However the export to mdcrd in mdtraj does not allow to remove the periodic box, which may be a problem 

124# DEPRECTAED: Use VMD instead 

125def merge_and_convert_trajectories_unefficient ( 

126 input_structure_filename : str, 

127 input_trajectory_filenames : List[str], 

128 output_trajectory_filename : str, 

129): 

130 print('WARNING: You are using a not memory efficient tool. If the trajectory is too big your system may not hold it.') 

131 # Load all trajectories together 

132 sample_trajectory = input_trajectory_filenames[0] 

133 if input_structure_filename: 

134 trajectory = mdt.load(sample_trajectory, top=input_structure_filename) 

135 else: 

136 trajectory = mdt.load(sample_trajectory) 

137 for extra_trajectory in input_trajectory_filenames[1:]: 

138 if input_structure_filename: 

139 extra = mdt.load(sample_trajectory, top=input_structure_filename) 

140 else: 

141 extra = mdt.load(sample_trajectory) 

142 trajectory = mdt.join([trajectory, extra], check_topology=False) 

143 

144 # Write the whole trajectory 

145 trajectory.save(output_trajectory_filename) 

146 

147merge_and_convert_trajectories_unefficient.format_sets = [ 

148 { 

149 'inputs': { 

150 'input_structure_filename': mdtraj_supported_structure_formats, 

151 'input_trajectory_filenames': mdtraj_supported_trajectory_formats 

152 }, 

153 'outputs': { 

154 'output_trajectory_filename': mdtraj_supported_trajectory_formats 

155 } 

156 }, 

157] 

158 

159# Get specific frames from a trajectory 

160# WARNING: This function is time unefficient 

161def get_trajectory_subset ( 

162 input_structure_filename : str, 

163 input_trajectory_filename : str, 

164 output_trajectory_filename : str, 

165 start : int = 0, 

166 end : int = None, 

167 step : int = 1, 

168 frames : List[int] = [], 

169 skip : List[int] = [], 

170): 

171 

172 # Load the trajectory frame by frame and get only the desired frames 

173 if input_structure_filename: 

174 trajectory = mdt.iterload(input_trajectory_filename, top=input_structure_filename, chunk=1) 

175 else: 

176 trajectory = mdt.iterload(input_trajectory_filename, chunk=1) 

177 # Set the reduced trajectory to be returned 

178 reduced_trajectory = None 

179 # Specific frames scenario 

180 # DANI: No se ha provado 

181 if frames and len(frames) > 0: 

182 # Get the first and the last frames 

183 sorted_frames = sorted(frames) 

184 first_frame = sorted_frames[0] 

185 last_frame = sorted_frames[-1] 

186 # Get the list of frames as a set to speed up the searches 

187 selected_frames = set(frames) 

188 # Get the first chunk/frame 

189 for frame, chunk in enumerate(trajectory): 

190 if frame == first_frame: 

191 reduced_trajectory = chunk 

192 break 

193 # If we have nothing at this point then it means our start is out of the frames range 

194 if not reduced_trajectory: 

195 raise SystemExit('None of the selected frames are in range of the trajectory frames number') 

196 # Get further chunks/frames 

197 for frame, chunk in enumerate(trajectory, first_frame + 1): 

198 if frame in selected_frames: 

199 reduced_trajectory = mdt.join([reduced_trajectory, chunk], check_topology=False) 

200 # If this is the last frame then stop here to avoid reading the rest of the trajectory 

201 if frame >= last_frame: 

202 break 

203 # Start / End / Step + Skip scenario 

204 else: 

205 # Get the first chunk 

206 frame_count = 0 # This count works only in case the start frame is out of range, for the logs 

207 for i, chunk in enumerate(trajectory): 

208 frame_count = i 

209 if i == start: 

210 reduced_trajectory = chunk 

211 break 

212 # If we have nothing at this point then it means our start is out of the frames range 

213 if not reduced_trajectory: 

214 frame_count += 1 

215 raise SystemExit('The trajectory has ' + str(frame_count) + ' frames so we can not start at frame ' + str(start)) 

216 # Get further chunks 

217 for i, chunk in enumerate(trajectory, 1): # Start the count at 1 

218 frame = start + i 

219 if frame in skip: 

220 continue 

221 if frame == end: 

222 break 

223 if i % step == 0: 

224 reduced_trajectory = mdt.join([reduced_trajectory, chunk], check_topology=False) 

225 

226 # WARNING: This print here is not just a log. DO NOT REMOVE IT 

227 # WARNING: It fixes an error (ValueError: unitcell angle < 0) which happens sometimes 

228 # WARNING: It fixes an error (ValueError: Only rectilinear boxes can be saved to mdcrd files) which happens sometimes 

229 # DANI: Esto es azaroso: a veces funciona y a veces no 

230 #print(reduced_trajectory) 

231 

232 # This is necessary sometimes to avoid the following error: 

233 # ValueError: Only rectilinear boxes can be saved to mdcrd files 

234 # DANI: Esto suele funcionar, aunque a veces mata el proceso 

235 # e.g. ValueError: unitcell_lengths must be shape (5, 3). You supplied (1, 3) 

236 # Este ejemplo fue observado con las trayectorias raw de 'NAFlex_1d11' 

237 # mdtb subset -is structure.stripped.pdb -it structure.stripped.mdcrd -ot test.mdcrd -start 2 -end 17 -step 3 

238 # DANI: Va bien que esto esté desactivado, así cuando falle podré compartir la trayectoria que da problemas a los de mdtraj: 

239 # https://github.com/mdtraj/mdtraj/issues/1730 

240 #reduced_trajectory.unitcell_lengths = [0,0,0] 

241 #reduced_trajectory.unitcell_angles = [90,90,90] 

242 

243 # Write reduced trajectory to output file 

244 reduced_trajectory.save(output_trajectory_filename) 

245get_trajectory_subset.format_sets = [ 

246 { 

247 'inputs': { 

248 'input_structure_filename': mdtraj_supported_structure_formats, 

249 'input_trajectory_filename': mdtraj_supported_trajectory_formats 

250 }, 

251 'outputs': { 

252 'output_trajectory_filename': mdtraj_supported_trajectory_formats 

253 } 

254 }, 

255 { 

256 'inputs': { 

257 'input_structure_filename': None, 

258 'input_trajectory_filename': mdtraj_supported_structure_formats 

259 }, 

260 'outputs': { 

261 'output_trajectory_filename': mdtraj_supported_structure_formats 

262 } 

263 } 

264] 

265 

266# Get first frame from a trajectory 

267def get_first_frame ( 

268 input_structure_filename : str, 

269 input_trajectory_filename : str, 

270 output_frame_filename : str, 

271): 

272 get_trajectory_subset(input_structure_filename, input_trajectory_filename, output_frame_filename, 0) 

273# Set function supported formats 

274single_frame_getter_format_sets = [ 

275 { 

276 'inputs': { 

277 'input_structure_filename': None, 

278 'input_trajectory_filename': mdtraj_supported_trajectory_formats 

279 }, 

280 'outputs': { 

281 'output_frame_filename': mdtraj_supported_trajectory_formats 

282 } 

283 }, 

284 { 

285 'inputs': { 

286 'input_structure_filename': mdtraj_supported_structure_formats, 

287 'input_trajectory_filename': mdtraj_supported_trajectory_formats 

288 }, 

289 'outputs': { 

290 'output_frame_filename': { *mdtraj_supported_structure_formats, *mdtraj_supported_trajectory_formats } 

291 } 

292 }, 

293] 

294get_first_frame.format_sets = single_frame_getter_format_sets 

295 

296# Split a trajectory which is actually a merge of independent trajectories back to the original pieces 

297# Run an RMSD analysis to guess where the pieces are 

298# The cutoff sets the minimum RMSD change to consider it is a different trajectory 

299def split_merged_trajectories ( 

300 input_structure_filename : str, 

301 input_trajectory_filename : str, 

302 sudden_jump_cutoff : float = 0.2, 

303 output_trajectory_prefix : str = 'split'): 

304 # Get the input trajectory format 

305 input_trajectory_file = File(input_trajectory_filename) 

306 input_trajectory_format = input_trajectory_file.format 

307 # The cutoff must be a negative number since independent trajectories RMSD sudden jumps will be negative 

308 cutoff = -abs(sudden_jump_cutoff) 

309 # Load the trajectory 

310 trajectory = mdt.load(input_trajectory_filename, top=input_structure_filename) 

311 # Run a RMSD analysis 

312 rmsd_data = mdt.rmsd(trajectory, trajectory, 0) 

313 # Find sudden jumps in RMSD values 

314 sudden_jumps = [] 

315 previous_value = 0 

316 for i, value in enumerate(rmsd_data): 

317 diff = value - previous_value 

318 # In case this is a new trajectory the jump will be negative 

319 if diff < cutoff: 

320 print('New trajectory at frame ' + str(i)) 

321 sudden_jumps.append(i) 

322 previous_value = value 

323 # In there was no jumps then stop here 

324 if len(sudden_jumps) == 0: 

325 print('Apparently there is a single trajectory') 

326 return 

327 # Generate a trajectory subset for each cut 

328 cut_points = [ 0, *sudden_jumps, len(rmsd_data) ] 

329 for i in range(len(cut_points) -1): 

330 start = cut_points[i] 

331 end = cut_points[i+1] 

332 trajectory_split = trajectory[start:end] 

333 split_filename = output_trajectory_prefix + '_' + str(i+1) + '.' + input_trajectory_format 

334 print('Writting from frame ' + str(start) + ' to frame ' + str(end) + ' to "' + split_filename + '"') 

335 trajectory_split.save(split_filename) 

336 

337# Sort atoms in a trajectory file by sorting its coordinates 

338# A new atom indices list is expected in order to do the sorting 

339def sort_trajectory_atoms ( 

340 input_structure_file : 'File', 

341 input_trajectory_file : 'File', 

342 output_trajectory_file : 'File', 

343 new_atom_indices : List[int] 

344): 

345 

346 # Load the topology, which is used further 

347 topology = mdt.load_topology(input_structure_file.path) 

348 

349 # Check the new atom indices to match the number of atoms in the topology 

350 if len(new_atom_indices) != topology.n_atoms: 

351 raise ValueError('The number of atom indices (' + str(len(new_atom_indices)) + ') does not match the number of atoms in topology(' + str(topology.n_atoms) + ')' ) 

352 

353 # If the input and the output trajectory filenames are equal then we must rename the input filename to not overwrite it 

354 if input_trajectory_file.path == output_trajectory_file.path: 

355 backup_file = input_trajectory_file.get_prefixed_file('backup.') 

356 os.rename(input_trajectory_file.path, backup_file.path) 

357 input_trajectory_file = backup_file 

358 

359 # If the output trajectory file already exists at this point then we must stop here 

360 # The raw trjcat implementation will not join things to the end of it 

361 if output_trajectory_file.exists: 

362 raise SystemExit('The output file already exists and overwrite is not supported for this functionality') 

363 

364 # Load the trajectory frame by frame 

365 trajectory = mdt.iterload(input_trajectory_file.path, top=input_structure_file.path, chunk=1) 

366 frame_filename = '.frame.xtc' 

367 

368 # Print an empty line for the first 'ERASE_PREVIOUS_LINE' to not delete a previous log 

369 print() 

370 

371 for f, frame in enumerate(trajectory): 

372 # Update the current frame log 

373 print(ERASE_PREVIOUS_LINE) 

374 print('Frame ' + str(f)) 

375 

376 # Sort coordinates according to new atom indices 

377 xyz = frame.xyz[0] 

378 new_xyz = np.empty(shape=xyz.shape) 

379 for i, atom_index in enumerate(new_atom_indices): 

380 new_xyz[i] = xyz[atom_index] 

381 

382 # Export the current frame 

383 new_frame = mdt.Trajectory([new_xyz], 

384 topology=topology, 

385 time=frame.time, # This is necessary for the new mdtraj trajectory to be functional 

386 unitcell_lengths=frame.unitcell_lengths, # This is necessary for the new mdtraj trajectory to be functional 

387 unitcell_angles=frame.unitcell_angles # This is necessary for the new mdtraj trajectory to be functional 

388 ) 

389 new_frame.save(frame_filename) 

390 # Join current frame to the output trajectory 

391 merge_xtc_files(output_trajectory_file.path, frame_filename) 

392 

393 # Remove the residual file 

394 # WARNING: It may not exist if the trajectory has 1 frame 

395 if exists(frame_filename): 

396 os.remove(frame_filename)