Coverage for mddb_workflow / analyses / pca.py: 89%

72 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 18:45 +0000

1# Principal component analysis (PCA) 

2from sklearn.decomposition import PCA 

3import numpy as np 

4 

5 

6import mdtraj as mdt 

7 

8from mddb_workflow.tools.get_reduced_trajectory import get_reduced_trajectory 

9from mddb_workflow.utils.auxiliar import warn, save_json 

10from mddb_workflow.utils.constants import OUTPUT_PCA_FILENAME, OUTPUT_PCA_PROJECTION_PREFIX 

11from mddb_workflow.utils.type_hints import * 

12 

13def pca ( 

14 structure_file : 'File', 

15 trajectory_file : 'File', 

16 output_directory : str, 

17 snapshots : int, 

18 structure : 'Structure', 

19 pca_fit_selection : str, 

20 pca_analysis_selection : str, 

21 pbc_selection : 'Selection', 

22 frames_limit : int = 2000, 

23 projection_frames : int = 20 

24) -> dict: 

25 """Perform a PCA analysis on the trajectory.""" 

26 # Set output filepaths 

27 output_analysis_filepath = f'{output_directory}/{OUTPUT_PCA_FILENAME}' 

28 output_trajectory_projections_prefix = f'{output_directory}/{OUTPUT_PCA_PROJECTION_PREFIX}' 

29 

30 # If trajectory frames number is bigger than the limit we create a reduced trajectory 

31 pca_trajectory_filepath, step, frames = get_reduced_trajectory( 

32 structure_file, 

33 trajectory_file, 

34 snapshots, 

35 frames_limit, 

36 ) 

37 

38 # Parse the string selections 

39 # VMD selection syntax 

40 parsed_fit_selection = structure.select(pca_fit_selection, syntax='vmd') 

41 if not parsed_fit_selection: 

42 print(f' PCA fit selection: {pca_fit_selection}') 

43 warn('PCA fit selection is empty -> Using all heavy atoms in the structure instead') 

44 parsed_fit_selection = structure.select_heavy_atoms() 

45 parsed_analysis_selection = structure.select(pca_analysis_selection, syntax='vmd') 

46 if not parsed_analysis_selection: 

47 print(f' PCA analysis selection: {pca_analysis_selection}') 

48 warn('PCA analysis selection is empty -> Using all heavy atoms in the structure instead') 

49 parsed_analysis_selection = structure.select_heavy_atoms() 

50 

51 # Parse PBC residues into a selection to be removed from PCA selections 

52 parsed_fit_selection -= pbc_selection 

53 parsed_analysis_selection -= pbc_selection 

54 

55 # If no selection is available at this point then stop here 

56 # This may happen when the whole system is in PCB 

57 if not parsed_analysis_selection: 

58 print(' No selection to do PCA') 

59 return 

60 

61 # Load the trajectory 

62 mdtraj_trajectory = mdt.load(pca_trajectory_filepath, top=structure_file.path) 

63 # Fit the trajectory according to the specified fit selection 

64 mdtraj_trajectory.superpose(mdtraj_trajectory, frame=0, atom_indices=parsed_fit_selection.atom_indices) 

65 # Check if all coordinates got identical 

66 # This is a MDtraj bug which may appear with wide spaced systems 

67 # https://github.com/mdtraj/mdtraj/issues/2108 

68 coordinates_sample = set([ tuple(coord) for coord in mdtraj_trajectory.xyz[0][0:5] ]) 

69 if len(coordinates_sample) == 1: 

70 warn('There was a problem with MDtraj due to your system being too wide spaced. Attempting to fix...') 

71 del mdtraj_trajectory 

72 # Load the trajectory again 

73 mdtraj_trajectory = mdt.load(pca_trajectory_filepath, top=structure_file.path) 

74 # Divide all coordinates by 10 before superposing and then multiply by 10 back 

75 mdtraj_trajectory._xyz /= 10 

76 mdtraj_trajectory.superpose(mdtraj_trajectory, frame=0, atom_indices=parsed_fit_selection.atom_indices) 

77 mdtraj_trajectory._xyz *= 10 

78 # Filter the atoms to be analized 

79 atom_indices = parsed_analysis_selection.atom_indices 

80 mdtraj_trajectory = mdtraj_trajectory.atom_slice(atom_indices=atom_indices) 

81 # Reshape data to a sklearn-friendly format 

82 frames_number = mdtraj_trajectory.n_frames 

83 atoms_number = mdtraj_trajectory.n_atoms 

84 reshape = mdtraj_trajectory.xyz.reshape(frames_number, atoms_number * 3) 

85 

86 # Perform the PCA using sklearn 

87 pca = PCA() 

88 transformed = pca.fit_transform(reshape) 

89 # DANI: Al transponer esto las proyecciones parece que tienen más sentido 

90 transformed = transformed.transpose() 

91 

92 # Get eigenvalues 

93 # Multiply values by 100 since values are in namometers (squared) and we want Ångstroms 

94 raw_eigenvalues = pca.explained_variance_.tolist() 

95 eigenvalues = [ ev * 100 for ev in raw_eigenvalues ] 

96 # Get eigenvectors 

97 raw_eigenvectors = pca.components_ 

98 

99 # Get the mean structure coordinates 

100 mean = pca.mean_.flatten() 

101 

102 # Get the total explained variance by adding all eigenvalues 

103 total = sum(eigenvalues) 

104 

105 # Now get the projection for those eigenvectors whose eigenvalue is greater than 1% of the total explained variance 

106 # Eigenvalues are ordered from greater to lower by default, so we stop at the first value lower than 1% 

107 # Save projections from those eigenvectors 

108 projections = [] 

109 cutoff = total / 100 

110 for i, value in enumerate(eigenvalues): 

111 if value < cutoff: break 

112 # This logic was copied from here: 

113 # https://userguide.mdanalysis.org/stable/examples/analysis/reduced_dimensions/pca.html 

114 eigenvector = [ float(v) for v in raw_eigenvectors[i] ] 

115 frame_projections = transformed[i] 

116 offset = np.outer(frame_projections, eigenvector) 

117 trajectory_projection = mean + offset 

118 coordinates = trajectory_projection.reshape(len(frame_projections), -1, 3) 

119 # Now we have the time dependent projection of the principal component 

120 # However, we will sort frames according to the projection value 

121 # In addition we will take only a few frames 

122 max_projection = max(frame_projections) 

123 min_projection = min(frame_projections) 

124 projection_step = (max_projection - min_projection) / (projection_frames - 1) 

125 selected_projections = [ min_projection + projection_step * s for s in range(projection_frames) ] 

126 selected_coordinates = [ coordinates[get_closer_value_index(frame_projections, p)] for p in selected_projections ] 

127 # Load coordinates in mdtraj and export the trajectory to xtc 

128 trajectory_projection = mdt.Trajectory(selected_coordinates, mdtraj_trajectory.topology) 

129 trajectory_projection_filename = output_trajectory_projections_prefix + '_' + str(i+1).zfill(2) + '.xtc' 

130 trajectory_projection.save_xtc(trajectory_projection_filename) 

131 # Save projections to be further exported to json 

132 projections.append([ float(v) for v in frame_projections ]) 

133 

134 # DANI: Usa esto para generar una estructura (pdb) que te permita visualizar las trajectory projections 

135 #trajectory_projection[0].save_pdb('pca.trajectory_projection_structure.pdb') 

136 

137 # Set the output dict 

138 data = { 

139 'framestep': step, 

140 'atoms': atom_indices, 

141 'eigenvalues': eigenvalues, 

142 'projections': projections 

143 } 

144 # Finally, export the analysis in json format 

145 save_json({'data': data}, output_analysis_filepath) 

146 

147# Given an array with numbers, get the index of the value which is closer 

148def get_closer_value_index (list : list, value : float) -> int: 

149 distances = [ abs(value-v) for v in list ] 

150 return min(range(len(distances)), key=distances.__getitem__)