Coverage for model_workflow/analyses/pca.py: 88%
64 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-23 10:54 +0000
1# Principal component analysis (PCA)
2from sklearn.decomposition import PCA
3import numpy as np
6import mdtraj as mdt
8from model_workflow.tools.get_reduced_trajectory import get_reduced_trajectory
9from model_workflow.utils.auxiliar import warn, save_json
10from model_workflow.utils.constants import OUTPUT_PCA_FILENAME, OUTPUT_PCA_PROJECTION_PREFIX
11from model_workflow.utils.type_hints import *
13def pca (
14 structure_file : 'File',
15 trajectory_file : 'File',
16 output_directory : str,
17 snapshots : int,
18 structure : 'Structure',
19 pca_fit_selection : str,
20 pca_analysis_selection : str,
21 pbc_selection : 'Selection',
22 frames_limit : int = 2000,
23 projection_frames : int = 20
24) -> dict:
25 """Perform a PCA analysis on the trajectory."""
26 # Set output filepaths
27 output_analysis_filepath = f'{output_directory}/{OUTPUT_PCA_FILENAME}'
28 output_trajectory_projections_prefix = f'{output_directory}/{OUTPUT_PCA_PROJECTION_PREFIX}'
30 # If trajectory frames number is bigger than the limit we create a reduced trajectory
31 pca_trajectory_filepath, step, frames = get_reduced_trajectory(
32 structure_file,
33 trajectory_file,
34 snapshots,
35 frames_limit,
36 )
38 # Parse the string selections
39 # VMD selection syntax
40 parsed_fit_selection = structure.select(pca_fit_selection, syntax='vmd')
41 if not parsed_fit_selection:
42 print(f' PCA fit selection: {pca_fit_selection}')
43 warn('PCA fit selection is empty -> Using all heavy atoms in the structure instead')
44 parsed_fit_selection = structure.select_heavy_atoms()
45 parsed_analysis_selection = structure.select(pca_analysis_selection, syntax='vmd')
46 if not parsed_analysis_selection:
47 print(f' PCA analysis selection: {pca_analysis_selection}')
48 warn('PCA analysis selection is empty -> Using all heavy atoms in the structure instead')
49 parsed_analysis_selection = structure.select_heavy_atoms()
51 # Parse PBC residues into a selection to be removed from PCA selections
52 parsed_fit_selection -= pbc_selection
53 parsed_analysis_selection -= pbc_selection
55 # If no selection is available at this point then stop here
56 # This may happen when the whole system is in PCB
57 if not parsed_analysis_selection:
58 print(' No selection to do PCA')
59 return
61 # Load the trajectory
62 mdtraj_trajectory = mdt.load(pca_trajectory_filepath, top=structure_file.path)
63 # Fit the trajectory according to the specified fit selection
64 mdtraj_trajectory.superpose(mdtraj_trajectory, frame=0, atom_indices=parsed_fit_selection.atom_indices)
65 # Filter the atoms to be analized
66 atom_indices = parsed_analysis_selection.atom_indices
67 mdtraj_trajectory = mdtraj_trajectory.atom_slice(atom_indices=atom_indices)
68 # Reshape data to a sklearn-friendly format
69 frames_number = mdtraj_trajectory.n_frames
70 atoms_number = mdtraj_trajectory.n_atoms
71 reshape = mdtraj_trajectory.xyz.reshape(frames_number, atoms_number * 3)
73 # Perform the PCA using sklearn
74 pca = PCA()
75 transformed = pca.fit_transform(reshape)
76 # DANI: Al transponer esto las proyecciones parece que tienen más sentido
77 transformed = transformed.transpose()
79 # Get eigenvalues
80 # Multiply values by 100 since values are in namometers (squared) and we want Ångstroms
81 eigenvalues = [ ev * 100 for ev in pca.explained_variance_ ]
82 # Get eigenvectors
83 eigenvectors = [ [ float(v) for v in eigenvector ] for eigenvector in pca.components_ ]
85 # Get the mean structure coordinates
86 mean = pca.mean_.flatten()
88 # Get the total explained variance by adding all eigenvalues
89 total = sum(eigenvalues)
91 # Now get the projection for those eigenvectors whose eigenvalue is greater than 1% of the total explained variance
92 # Eigenvalues are ordered from greater to lower by default, so we stop at the first value lower than 1%
93 # Save projections from those eigenvectors
94 projections = []
95 cutoff = total / 100
96 for i, value in enumerate(eigenvalues):
97 if value < cutoff:
98 break
99 # This logic was copied from here:
100 # https://userguide.mdanalysis.org/stable/examples/analysis/reduced_dimensions/pca.html
101 eigenvector = eigenvectors[i]
102 frame_projections = transformed[i]
103 offset = np.outer(frame_projections, eigenvector)
104 trajectory_projection = mean + offset
105 coordinates = trajectory_projection.reshape(len(frame_projections), -1, 3)
106 # Now we have the time dependent projection of the principal component
107 # However, we will sort frames according to the projection value
108 # In addition we will take only a few frames
109 max_projection = max(frame_projections)
110 min_projection = min(frame_projections)
111 projection_step = (max_projection - min_projection) / (projection_frames - 1)
112 selected_projections = [ min_projection + projection_step * s for s in range(projection_frames) ]
113 selected_coordinates = [ coordinates[get_closer_value_index(frame_projections, p)] for p in selected_projections ]
114 # Load coordinates in mdtraj and export the trajectory to xtc
115 trajectory_projection = mdt.Trajectory(selected_coordinates, mdtraj_trajectory.topology)
116 trajectory_projection_filename = output_trajectory_projections_prefix + '_' + str(i+1).zfill(2) + '.xtc'
117 trajectory_projection.save_xtc(trajectory_projection_filename)
118 # Save projections to be further exported to json
119 projections.append([ float(v) for v in frame_projections ])
121 # DANI: Usa esto para generar una estructura (pdb) que te permita visualizar las trajectory projections
122 #trajectory_projection[0].save_pdb('pca.trajectory_projection_structure.pdb')
124 # Set the output dict
125 data = {
126 'framestep': step,
127 'atoms': atom_indices,
128 'eigenvalues': eigenvalues,
129 'projections': projections
130 }
131 # Finally, export the analysis in json format
132 save_json({'data': data}, output_analysis_filepath)
134# Given an array with numbers, get the index of the value which is closer
135def get_closer_value_index (list : list, value : float) -> int:
136 distances = [ abs(value-v) for v in list ]
137 return min(range(len(distances)), key=distances.__getitem__)