Source code for grassp.tools.integration

from __future__ import annotations
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from anndata import AnnData
    from typing import List, Sequence

from functools import reduce

import numpy as np
import umap


[docs] def align_adatas( data_list: List[AnnData], intersect_obs: bool = True, intersect_var: bool = True ) -> List[AnnData]: """Return copies of several ``AnnData`` objects with matching index/columns. Parameters ---------- data_list Sequence of :class:`~anndata.AnnData` instances to be trimmed. intersect_obs If *True* (default) keep only the intersection of *obs_names*. intersect_var If *True* (default) keep only the intersection of *var_names*. Returns ------- list[AnnData] Aligned **copies** preserving the order of *data_list*. """ obs = [data.obs_names for data in data_list] if intersect_obs: obs_intersect = reduce(lambda x, y: x.intersection(y), obs) else: obs_intersect = None if intersect_var: var = [data.var_names for data in data_list] var_intersect = reduce(lambda x, y: x.intersection(y), var) else: var_intersect = None data_sub_list = [] for data in data_list: data_sub = data.copy() if var_intersect is not None: data_sub = data_sub[:, var_intersect] if obs_intersect is not None: data_sub = data_sub[obs_intersect, :] data_sub_list.append(data_sub) return data_sub_list
[docs] def aligned_umap( data_list: List[AnnData], align_data: bool = True, return_data_objects: bool = True, n_neighbors: int = 20, metric: str = "euclidean", min_dist: float = 0.1, alignment_regularisation: float = 0.002, n_epochs: int = 300, random_state: int | None = None, verbose: bool = False, n_components: int = 2, ) -> umap.AlignedUMAP: """Aligned UMAP embedding for *matched* datasets. Wraps :class:`umap.AlignedUMAP` and writes the resulting coordinates to ``.obsm['X_aligned_umap']`` of each input object. Parameters ---------- data_list List of :class:`~anndata.AnnData` objects assumed to contain the *same* observations and variables (will be intersected if *align_data* is ``True``). align_data Whether to call :func:`align_adatas` before fitting. return_data_objects If ``True`` (default) return the modified list; otherwise return the fitted :class:`umap.AlignedUMAP` estimator. n_neighbors Number of nearest neighbours to construct the UMAP graph. metric Distance metric used by UMAP (*e.g.* ``"euclidean"``). min_dist Minimum separation of points in the low-dimensional space. alignment_regularisation Weight of the between-dataset alignment objective. n_epochs Training epochs. n_components Dimensionality of the embedding (usually ``2``). random_state, verbose Passed directly to :class:`umap.AlignedUMAP`. Returns ------- list[AnnData] | umap.AlignedUMAP Depending on *return_data_objects*. """ # Make sure all anndata objects have the same var_names and obs_names if align_data: data_sub_list = align_adatas(data_list) else: data_sub_list = data_list var_names = [data.var_names for data in data_sub_list] assert all(var_names[0].equals(index) for index in var_names[1:]) obs_names = [data.obs_names for data in data_sub_list] assert all(obs_names[0].equals(index) for index in obs_names[1:]) embeddings = [data.X for data in data_sub_list] constant_relation = {i: i for i in range(data_sub_list[0].shape[0])} constant_relations = [constant_relation.copy() for i in range(len(embeddings) - 1)] # return constant_relations neighbors_mapper = umap.AlignedUMAP( n_neighbors=n_neighbors, metric=metric, min_dist=min_dist, alignment_regularisation=alignment_regularisation, n_epochs=n_epochs, random_state=random_state, verbose=verbose, n_components=n_components, ).fit(embeddings, relations=constant_relations) if return_data_objects: for i, data in enumerate(data_sub_list): data.obsm["X_aligned_umap"] = neighbors_mapper.embeddings_[i] data.uns["aligned_umap"] = { "params": { "a": neighbors_mapper.mappers_[i].a, "b": neighbors_mapper.mappers_[i].b, }, "alignment_params": { "alignment_regularisation": neighbors_mapper.alignment_regularisation, "n_epochs": neighbors_mapper.n_epochs, "random_state": neighbors_mapper.random_state, "n_components": neighbors_mapper.n_components, }, } return data_sub_list else: return neighbors_mapper
def _remodeling_score( embeddings: Sequence[np.ndarray], ) -> np.ndarray: assert len(embeddings) == 2 distances = embeddings[0] - embeddings[1] return np.linalg.norm(distances, axis=1)
[docs] def remodeling_score( data_list: List[AnnData], aligned_umap_key: str = "X_aligned_umap", key_added: str = "remodeling_score", ) -> List[AnnData]: """Compute per-protein *remodeling score* from two aligned datasets. The remodeling score is the Euclidean distance between a protein’s coordinates in two aligned UMAP embeddings. Parameters ---------- data_list Exactly two :class:`~anndata.AnnData` objects with aligned coordinates in ``.obsm[aligned_umap_key]``. aligned_umap_key Key where the embedding is stored. key_added Observation key used to record the scores. Returns ------- list[AnnData] The same list with an added ``.obs[key_added]`` column. """ embeddings = [data.obsm[aligned_umap_key] for data in data_list] remodeling_score = _remodeling_score(embeddings) for i, data in enumerate(data_list): data.obs[key_added] = remodeling_score return data_list
def mr_score( data: AnnData, condition_key: str, replicate_key: str, mcd_proportion: float = 0.75, assume_centered: bool = True, n_iterations: int = 11, key_added: str = "mr_scores", ) -> None: """*M*/*R* score for detecting protein translocation. Implements the approach from Itzhak *et al.* 2016 where *M* is a moderated t-statistic (via Minimum Covariance Determinant) and *R* the measure of reproducibility between replicates. Parameters ---------- data AnnData object containing all conditions/replicates stacked in variables. condition_key Observation column indicating the experimental condition (e.g. *control*, *treatment*). replicate_key Column indicating biological replicates. mcd_proportion, assume_centered, n_iterations Parameters forwarded to :func:`sklearn.covariance.MinCovDet`. key_added Base name for the two new columns ``{key_added}_M`` and ``{key_added}_R`` written to ``data.obs``. """ from scipy.stats import chi2 from sklearn.covariance import MinCovDet from statsmodels.stats.multitest import multipletests # Check inputs if condition_key not in data.var: raise ValueError(f"Condition key {condition_key} not found in data.var") if replicate_key not in data.var: raise ValueError(f"Replicate key {replicate_key} not found in data.var") conditions = data.var[condition_key].unique() if len(conditions) != 2: raise ValueError("Exactly 2 conditions are required") control_name = conditions[0] treatment_name = conditions[1] # Get unique replicates replicates = data.var[replicate_key].unique() if len(replicates) != 3: raise ValueError("Exactly 3 biological replicates are required") # import matplotlib.pyplot as plt # Calculate difference profiles for each replicate diff_profiles = [] for rep in replicates: control_mask = (data.var[condition_key] == control_name) & ( data.var[replicate_key] == rep ) treat_mask = (data.var[condition_key] == treatment_name) & ( data.var[replicate_key] == rep ) # if not (control_mask.sum() == treat_mask.sum() == 5): # raise ValueError(f"Expected 5 fractions per condition in replicate {rep}") # @TODO This assumes that the samples have the same order diff = data[:, control_mask].X - data[:, treat_mask].X diff_profiles.append(diff) # Calculate M scores p_values = np.zeros((data.n_obs, len(replicates), n_iterations)) for i in range(n_iterations): for j, diff in enumerate(diff_profiles): # Robust Mahalanobis distance calculation mcd = MinCovDet( support_fraction=mcd_proportion, random_state=i, assume_centered=assume_centered, ) mcd.fit(diff) if assume_centered: distances = mcd.mahalanobis(diff) else: distances = mcd.mahalanobis(diff - mcd.location_) # Convert to p-values using chi-square distribution (degrees of freedom = number of fractions) df = control_mask.sum() p_values[:, j, i] = chi2.sf(distances, df=df) p_values = p_values.astype(np.longdouble) p_values = np.median(p_values, axis=2) # Take highest p-value (lowest M score) for each protein max_p_values = np.max(p_values, axis=1) # Cube p-values and apply Benjamini-Hochberg correction p_values = np.power(max_p_values, 3) # Cube p-values _, p_values_adj, _, _ = multipletests(p_values, method="fdr_bh") # We add a small epsilon to avoid log(0) final_m_scores = -np.log10(p_values_adj + np.finfo(np.float64).tiny).astype(np.float64) # Calculate R scores (reproducibility) r_scores = np.zeros(data.n_obs) for i in range(data.n_obs): correlations = [] # Calculate correlations between all pairs of replicates for j in range(len(replicates)): for k in range(j + 1, len(replicates)): corr = np.corrcoef(diff_profiles[j][i], diff_profiles[k][i])[0, 1] correlations.append(corr) # Take lowest correlation as R score r_scores[i] = np.min(correlations) # Store results data.obs[f"{key_added}_M"] = final_m_scores data.obs[f"{key_added}_R"] = r_scores data.uns[key_added] = { "params": { "mcd_proportion": mcd_proportion, "n_iterations": n_iterations, "control_name": control_name, "treatment_name": treatment_name, }, } # Add scores to obs data.obs[f"{key_added}_M"] = final_m_scores data.obs[f"{key_added}_R"] = r_scores