Coverage for model_workflow/analyses/pca.py: 88%

64 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-23 10:54 +0000

1# Principal component analysis (PCA) 

2from sklearn.decomposition import PCA 

3import numpy as np 

4 

5 

6import mdtraj as mdt 

7 

8from model_workflow.tools.get_reduced_trajectory import get_reduced_trajectory 

9from model_workflow.utils.auxiliar import warn, save_json 

10from model_workflow.utils.constants import OUTPUT_PCA_FILENAME, OUTPUT_PCA_PROJECTION_PREFIX 

11from model_workflow.utils.type_hints import * 

12 

13def pca ( 

14 structure_file : 'File', 

15 trajectory_file : 'File', 

16 output_directory : str, 

17 snapshots : int, 

18 structure : 'Structure', 

19 pca_fit_selection : str, 

20 pca_analysis_selection : str, 

21 pbc_selection : 'Selection', 

22 frames_limit : int = 2000, 

23 projection_frames : int = 20 

24) -> dict: 

25 """Perform a PCA analysis on the trajectory.""" 

26 # Set output filepaths 

27 output_analysis_filepath = f'{output_directory}/{OUTPUT_PCA_FILENAME}' 

28 output_trajectory_projections_prefix = f'{output_directory}/{OUTPUT_PCA_PROJECTION_PREFIX}' 

29 

30 # If trajectory frames number is bigger than the limit we create a reduced trajectory 

31 pca_trajectory_filepath, step, frames = get_reduced_trajectory( 

32 structure_file, 

33 trajectory_file, 

34 snapshots, 

35 frames_limit, 

36 ) 

37 

38 # Parse the string selections 

39 # VMD selection syntax 

40 parsed_fit_selection = structure.select(pca_fit_selection, syntax='vmd') 

41 if not parsed_fit_selection: 

42 print(f' PCA fit selection: {pca_fit_selection}') 

43 warn('PCA fit selection is empty -> Using all heavy atoms in the structure instead') 

44 parsed_fit_selection = structure.select_heavy_atoms() 

45 parsed_analysis_selection = structure.select(pca_analysis_selection, syntax='vmd') 

46 if not parsed_analysis_selection: 

47 print(f' PCA analysis selection: {pca_analysis_selection}') 

48 warn('PCA analysis selection is empty -> Using all heavy atoms in the structure instead') 

49 parsed_analysis_selection = structure.select_heavy_atoms() 

50 

51 # Parse PBC residues into a selection to be removed from PCA selections 

52 parsed_fit_selection -= pbc_selection 

53 parsed_analysis_selection -= pbc_selection 

54 

55 # If no selection is available at this point then stop here 

56 # This may happen when the whole system is in PCB 

57 if not parsed_analysis_selection: 

58 print(' No selection to do PCA') 

59 return 

60 

61 # Load the trajectory 

62 mdtraj_trajectory = mdt.load(pca_trajectory_filepath, top=structure_file.path) 

63 # Fit the trajectory according to the specified fit selection 

64 mdtraj_trajectory.superpose(mdtraj_trajectory, frame=0, atom_indices=parsed_fit_selection.atom_indices) 

65 # Filter the atoms to be analized 

66 atom_indices = parsed_analysis_selection.atom_indices 

67 mdtraj_trajectory = mdtraj_trajectory.atom_slice(atom_indices=atom_indices) 

68 # Reshape data to a sklearn-friendly format 

69 frames_number = mdtraj_trajectory.n_frames 

70 atoms_number = mdtraj_trajectory.n_atoms 

71 reshape = mdtraj_trajectory.xyz.reshape(frames_number, atoms_number * 3) 

72 

73 # Perform the PCA using sklearn 

74 pca = PCA() 

75 transformed = pca.fit_transform(reshape) 

76 # DANI: Al transponer esto las proyecciones parece que tienen más sentido 

77 transformed = transformed.transpose() 

78 

79 # Get eigenvalues 

80 # Multiply values by 100 since values are in namometers (squared) and we want Ångstroms 

81 eigenvalues = [ ev * 100 for ev in pca.explained_variance_ ] 

82 # Get eigenvectors 

83 eigenvectors = [ [ float(v) for v in eigenvector ] for eigenvector in pca.components_ ] 

84 

85 # Get the mean structure coordinates 

86 mean = pca.mean_.flatten() 

87 

88 # Get the total explained variance by adding all eigenvalues 

89 total = sum(eigenvalues) 

90 

91 # Now get the projection for those eigenvectors whose eigenvalue is greater than 1% of the total explained variance 

92 # Eigenvalues are ordered from greater to lower by default, so we stop at the first value lower than 1% 

93 # Save projections from those eigenvectors 

94 projections = [] 

95 cutoff = total / 100 

96 for i, value in enumerate(eigenvalues): 

97 if value < cutoff: 

98 break 

99 # This logic was copied from here: 

100 # https://userguide.mdanalysis.org/stable/examples/analysis/reduced_dimensions/pca.html 

101 eigenvector = eigenvectors[i] 

102 frame_projections = transformed[i] 

103 offset = np.outer(frame_projections, eigenvector) 

104 trajectory_projection = mean + offset 

105 coordinates = trajectory_projection.reshape(len(frame_projections), -1, 3) 

106 # Now we have the time dependent projection of the principal component 

107 # However, we will sort frames according to the projection value 

108 # In addition we will take only a few frames 

109 max_projection = max(frame_projections) 

110 min_projection = min(frame_projections) 

111 projection_step = (max_projection - min_projection) / (projection_frames - 1) 

112 selected_projections = [ min_projection + projection_step * s for s in range(projection_frames) ] 

113 selected_coordinates = [ coordinates[get_closer_value_index(frame_projections, p)] for p in selected_projections ] 

114 # Load coordinates in mdtraj and export the trajectory to xtc 

115 trajectory_projection = mdt.Trajectory(selected_coordinates, mdtraj_trajectory.topology) 

116 trajectory_projection_filename = output_trajectory_projections_prefix + '_' + str(i+1).zfill(2) + '.xtc' 

117 trajectory_projection.save_xtc(trajectory_projection_filename) 

118 # Save projections to be further exported to json 

119 projections.append([ float(v) for v in frame_projections ]) 

120 

121 # DANI: Usa esto para generar una estructura (pdb) que te permita visualizar las trajectory projections 

122 #trajectory_projection[0].save_pdb('pca.trajectory_projection_structure.pdb') 

123 

124 # Set the output dict 

125 data = { 

126 'framestep': step, 

127 'atoms': atom_indices, 

128 'eigenvalues': eigenvalues, 

129 'projections': projections 

130 } 

131 # Finally, export the analysis in json format 

132 save_json({'data': data}, output_analysis_filepath) 

133 

134# Given an array with numbers, get the index of the value which is closer 

135def get_closer_value_index (list : list, value : float) -> int: 

136 distances = [ abs(value-v) for v in list ] 

137 return min(range(len(distances)), key=distances.__getitem__)