Coverage for model_workflow/tools/get_pdb_frames.py: 100%

31 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 10:54 +0000

1from model_workflow.utils.pyt_spells import get_pytraj_trajectory, get_reduced_pytraj_trajectory 

2from model_workflow.utils.auxiliar import reprint 

3from tqdm import tqdm 

4import os 

5from typing import Optional 

6 

7import pytraj as pt 

8# Build a generator which returns frames from the trajectory in pdb format 

9# The frames limit is the maximum number of frames to be iterated 

10# Note that the number of frames iterated may be less than the specified number 

11def get_pdb_frames ( 

12 topology_filename : str, 

13 trajectory_filename : str, 

14 snapshots : int, 

15 frames_limit : Optional[int] = None, 

16 output_frames_prefix : str = 'frame', 

17 pbar_bool : bool = False, 

18 patience=float('inf') 

19): 

20 # WARNING: Do not set a pytraj iterload trajectory and read its 'n_frames' to get the snapshots 

21 # WARNING: Trying to read the number o frames of a xtc trajectory will read the whole trajectory 

22 # WARNING: This may be a lot of time for a huge trajectory. Use the snapshots input instead 

23 

24 # In case we are missing a frames limit set the limit as the number os snapshots 

25 if frames_limit == None: 

26 frames_limit = snapshots 

27 

28 # Set a maximum number of frames 

29 # If trajectory has more frames than the limit create a reduced trajectory 

30 reduced_trajectory, frames_step, frames_count = get_reduced_pytraj_trajectory( 

31 topology_filename, 

32 trajectory_filename, 

33 snapshots, 

34 frames_limit 

35 ) 

36 

37 def frames_generator(): 

38 # Get the current directory at this point and use it to delete old files, in case we change the directory 

39 cwd = os.getcwd() 

40 # Create a progress bar 

41 n_frames = min(frames_count, patience) 

42 if pbar_bool: pbar = tqdm(initial=0, desc=' Frames', total=n_frames, unit='frame') 

43 # Or print an empty line for the reprint to not delete a previous log 

44 else: print() 

45 # Extract each frame in pdb format 

46 for f in range(n_frames): 

47 # Get the actual frame number 

48 # We display latter the frame with a +1 to make it 1-based instead of 0-based 

49 frame_number = f * frames_step 

50 # Update the current frame log 

51 if pbar_bool: pbar.update(1); pbar.refresh() 

52 else: reprint(f'Frame {frame_number+1} ({f+1} / {frames_count})') 

53 current_frame = f'{cwd}/{output_frames_prefix}{frame_number+1}.pdb' 

54 single_frame_trajectory = reduced_trajectory[f:f+1] 

55 pt.write_traj(current_frame, single_frame_trajectory, overwrite=True) 

56 yield current_frame 

57 # Delete current frame file before going for the next frame 

58 os.remove(current_frame) 

59 

60 return frames_generator(), frames_step, frames_count 

61 

62# Get a specific trajectory frame in pdb format 

63# Return the name of the generated file 

64def get_pdb_frame ( 

65 topology_filename : str, 

66 trajectory_filename : str, 

67 frame : int 

68) -> str: 

69 

70 # Load the trajectory using pytraj 

71 trajectory = get_pytraj_trajectory(topology_filename, trajectory_filename) 

72 trajectory_frame = trajectory[frame:frame+1] 

73 trajectory_frame_filename = f'frame{frame}.pdb' 

74 pt.write_traj(trajectory_frame_filename, trajectory_frame, overwrite=True) 

75 return trajectory_frame_filename