Coverage for mddb_workflow / analyses / pca.py: 89%
72 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 18:45 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 18:45 +0000
1# Principal component analysis (PCA)
2from sklearn.decomposition import PCA
3import numpy as np
6import mdtraj as mdt
8from mddb_workflow.tools.get_reduced_trajectory import get_reduced_trajectory
9from mddb_workflow.utils.auxiliar import warn, save_json
10from mddb_workflow.utils.constants import OUTPUT_PCA_FILENAME, OUTPUT_PCA_PROJECTION_PREFIX
11from mddb_workflow.utils.type_hints import *
13def pca (
14 structure_file : 'File',
15 trajectory_file : 'File',
16 output_directory : str,
17 snapshots : int,
18 structure : 'Structure',
19 pca_fit_selection : str,
20 pca_analysis_selection : str,
21 pbc_selection : 'Selection',
22 frames_limit : int = 2000,
23 projection_frames : int = 20
24) -> dict:
25 """Perform a PCA analysis on the trajectory."""
26 # Set output filepaths
27 output_analysis_filepath = f'{output_directory}/{OUTPUT_PCA_FILENAME}'
28 output_trajectory_projections_prefix = f'{output_directory}/{OUTPUT_PCA_PROJECTION_PREFIX}'
30 # If trajectory frames number is bigger than the limit we create a reduced trajectory
31 pca_trajectory_filepath, step, frames = get_reduced_trajectory(
32 structure_file,
33 trajectory_file,
34 snapshots,
35 frames_limit,
36 )
38 # Parse the string selections
39 # VMD selection syntax
40 parsed_fit_selection = structure.select(pca_fit_selection, syntax='vmd')
41 if not parsed_fit_selection:
42 print(f' PCA fit selection: {pca_fit_selection}')
43 warn('PCA fit selection is empty -> Using all heavy atoms in the structure instead')
44 parsed_fit_selection = structure.select_heavy_atoms()
45 parsed_analysis_selection = structure.select(pca_analysis_selection, syntax='vmd')
46 if not parsed_analysis_selection:
47 print(f' PCA analysis selection: {pca_analysis_selection}')
48 warn('PCA analysis selection is empty -> Using all heavy atoms in the structure instead')
49 parsed_analysis_selection = structure.select_heavy_atoms()
51 # Parse PBC residues into a selection to be removed from PCA selections
52 parsed_fit_selection -= pbc_selection
53 parsed_analysis_selection -= pbc_selection
55 # If no selection is available at this point then stop here
56 # This may happen when the whole system is in PCB
57 if not parsed_analysis_selection:
58 print(' No selection to do PCA')
59 return
61 # Load the trajectory
62 mdtraj_trajectory = mdt.load(pca_trajectory_filepath, top=structure_file.path)
63 # Fit the trajectory according to the specified fit selection
64 mdtraj_trajectory.superpose(mdtraj_trajectory, frame=0, atom_indices=parsed_fit_selection.atom_indices)
65 # Check if all coordinates got identical
66 # This is a MDtraj bug which may appear with wide spaced systems
67 # https://github.com/mdtraj/mdtraj/issues/2108
68 coordinates_sample = set([ tuple(coord) for coord in mdtraj_trajectory.xyz[0][0:5] ])
69 if len(coordinates_sample) == 1:
70 warn('There was a problem with MDtraj due to your system being too wide spaced. Attempting to fix...')
71 del mdtraj_trajectory
72 # Load the trajectory again
73 mdtraj_trajectory = mdt.load(pca_trajectory_filepath, top=structure_file.path)
74 # Divide all coordinates by 10 before superposing and then multiply by 10 back
75 mdtraj_trajectory._xyz /= 10
76 mdtraj_trajectory.superpose(mdtraj_trajectory, frame=0, atom_indices=parsed_fit_selection.atom_indices)
77 mdtraj_trajectory._xyz *= 10
78 # Filter the atoms to be analized
79 atom_indices = parsed_analysis_selection.atom_indices
80 mdtraj_trajectory = mdtraj_trajectory.atom_slice(atom_indices=atom_indices)
81 # Reshape data to a sklearn-friendly format
82 frames_number = mdtraj_trajectory.n_frames
83 atoms_number = mdtraj_trajectory.n_atoms
84 reshape = mdtraj_trajectory.xyz.reshape(frames_number, atoms_number * 3)
86 # Perform the PCA using sklearn
87 pca = PCA()
88 transformed = pca.fit_transform(reshape)
89 # DANI: Al transponer esto las proyecciones parece que tienen más sentido
90 transformed = transformed.transpose()
92 # Get eigenvalues
93 # Multiply values by 100 since values are in namometers (squared) and we want Ångstroms
94 raw_eigenvalues = pca.explained_variance_.tolist()
95 eigenvalues = [ ev * 100 for ev in raw_eigenvalues ]
96 # Get eigenvectors
97 raw_eigenvectors = pca.components_
99 # Get the mean structure coordinates
100 mean = pca.mean_.flatten()
102 # Get the total explained variance by adding all eigenvalues
103 total = sum(eigenvalues)
105 # Now get the projection for those eigenvectors whose eigenvalue is greater than 1% of the total explained variance
106 # Eigenvalues are ordered from greater to lower by default, so we stop at the first value lower than 1%
107 # Save projections from those eigenvectors
108 projections = []
109 cutoff = total / 100
110 for i, value in enumerate(eigenvalues):
111 if value < cutoff: break
112 # This logic was copied from here:
113 # https://userguide.mdanalysis.org/stable/examples/analysis/reduced_dimensions/pca.html
114 eigenvector = [ float(v) for v in raw_eigenvectors[i] ]
115 frame_projections = transformed[i]
116 offset = np.outer(frame_projections, eigenvector)
117 trajectory_projection = mean + offset
118 coordinates = trajectory_projection.reshape(len(frame_projections), -1, 3)
119 # Now we have the time dependent projection of the principal component
120 # However, we will sort frames according to the projection value
121 # In addition we will take only a few frames
122 max_projection = max(frame_projections)
123 min_projection = min(frame_projections)
124 projection_step = (max_projection - min_projection) / (projection_frames - 1)
125 selected_projections = [ min_projection + projection_step * s for s in range(projection_frames) ]
126 selected_coordinates = [ coordinates[get_closer_value_index(frame_projections, p)] for p in selected_projections ]
127 # Load coordinates in mdtraj and export the trajectory to xtc
128 trajectory_projection = mdt.Trajectory(selected_coordinates, mdtraj_trajectory.topology)
129 trajectory_projection_filename = output_trajectory_projections_prefix + '_' + str(i+1).zfill(2) + '.xtc'
130 trajectory_projection.save_xtc(trajectory_projection_filename)
131 # Save projections to be further exported to json
132 projections.append([ float(v) for v in frame_projections ])
134 # DANI: Usa esto para generar una estructura (pdb) que te permita visualizar las trajectory projections
135 #trajectory_projection[0].save_pdb('pca.trajectory_projection_structure.pdb')
137 # Set the output dict
138 data = {
139 'framestep': step,
140 'atoms': atom_indices,
141 'eigenvalues': eigenvalues,
142 'projections': projections
143 }
144 # Finally, export the analysis in json format
145 save_json({'data': data}, output_analysis_filepath)
147# Given an array with numbers, get the index of the value which is closer
148def get_closer_value_index (list : list, value : float) -> int:
149 distances = [ abs(value-v) for v in list ]
150 return min(range(len(distances)), key=distances.__getitem__)