from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import numpy as np
from anndata import AnnData
import sklearn.metrics
import warnings
import numpy as np
import pandas as pd
def class_balance(
data: AnnData, label_key: str, min_class_size: int = 10, seed: int = 42
) -> AnnData:
"""Balance classes in the data.
Parameters
----------
adata : AnnData
Annotated data matrix.
label_key : str
Key in adata.obs containing cluster labels.
min_class_size : int, optional
Minimum number of samples per class. Defaults to 10.
"""
# Check if label_key is in adata.obs
if label_key not in data.obs.columns:
raise ValueError(f"Label key {label_key} not found in adata.obs")
# Remove all samples with missing labels
data_sub = data[data.obs[label_key].notna()]
# Check if smallest class has at least min_class_size samples
min_class_s = data_sub.obs[label_key].value_counts().min()
min_class = data_sub.obs[label_key].value_counts().idxmin()
if min_class_s < min_class_size:
raise ValueError(
f"Smallest class ({min_class}) has less than {min_class_size} samples."
)
if min_class_s < 10:
warnings.warn(
f"Smallest class ({min_class}) has less than 10 samples, this might not yield a stable score."
)
obs_names = []
for label in data_sub.obs[label_key].unique():
obs_names.extend(
data_sub.obs[data_sub.obs[label_key] == label]
.sample(min_class_s, replace=False, random_state=seed)
.index.values
)
data_sub = data_sub[obs_names, :]
return data_sub
[docs]
def silhouette_score(
data, gt_col, use_rep="X_umap", key_added="silhouette", inplace=True
) -> None | np.ndarray:
"""Calculate silhouette scores for clustered data.
Parameters
----------
data : AnnData
Annotated data matrix.
gt_col : str
Column name in data.obs containing cluster labels.
use_rep : str, optional
Key for representation in data.obsm to use for score calculation.
Defaults to 'X_umap'.
key_added : str, optional
Key under which to add the silhouette scores. Defaults to 'silhouette'.
inplace : bool, optional
If True, store results in data, else return scores. Defaults to True.
Returns
-------
None or ndarray
If inplace=True, returns None and stores results in data.
If inplace=False, returns array of silhouette scores.
"""
mask = data.obs[gt_col].notna()
data_sub = data[mask]
sub_obs = data_sub.obs.copy()
ss = sklearn.metrics.silhouette_samples(data_sub.obsm[use_rep], sub_obs[gt_col])
if inplace:
sub_obs[key_added] = ss
cluster_mean_ss = sub_obs.groupby(gt_col)[key_added].mean()
data.uns[key_added] = {
"mean_silhouette_score": ss.mean(),
"cluster_mean_silhouette": cluster_mean_ss.to_dict(),
"cluster_balanced_silhouette_score": cluster_mean_ss.mean(),
}
data.obs.loc[mask, key_added] = ss
else:
return ss
[docs]
def calinski_habarasz_score(
data,
gt_col,
use_rep="X_umap",
key_added="ch_score",
class_balance=False,
inplace=True,
seed=42,
) -> None | float:
"""Calculate Calinski-Harabasz score for clustered data.
Parameters
----------
data : AnnData
Annotated data matrix.
gt_col : str
Column name in data.obs containing cluster labels.
use_rep : str, optional
Key for representation in data.obsm to use for score calculation.
Defaults to 'X_umap'.
key_added : str, optional
Key under which to add the score. Defaults to 'ch_score'.
inplace : bool, optional
If True, store results in data, else return score. Defaults to True.
Returns
-------
None or float
If inplace=True, returns None and stores result in data.
If inplace=False, returns the Calinski-Harabasz score.
"""
mask = data.obs[gt_col].notna()
data_sub = data[mask]
if class_balance:
min_class_size = data_sub.obs[gt_col].value_counts().min()
if min_class_size < 10:
warnings.warn(
f"Smallest class has less than 10 samples, this might not yield a stable score."
)
obs_names = []
print(data_sub.obs[gt_col].unique())
for label in data_sub.obs[gt_col].unique():
obs_names.extend(
data_sub.obs[data_sub.obs[gt_col] == label]
.sample(min_class_size, replace=False, random_state=seed)
.index.values
)
data_sub = data_sub[obs_names, :]
print(data_sub.shape)
ch = sklearn.metrics.calinski_harabasz_score(
data_sub.obsm[use_rep], data_sub.obs[gt_col]
)
if inplace:
data.uns[key_added] = ch
else:
return ch
def qsep_score(
data: AnnData,
label_key: str,
use_rep: str = "X",
distance_key: str = "full_distances",
inplace: bool = True,
) -> None | np.ndarray:
"""Calculate QSep scores for spatial proteomics data.
Parameters
----------
data : AnnData
Annotated data matrix.
label_key : str
Key in data.obs containing cluster labels.
use_rep : str, optional
Key for representation to use for distance calculation.
Either 'X' or a key in data.obsm. Defaults to 'X'.
distance_key : str, optional
Key under which to store the full distances in data.obs.
Defaults to 'full_distances'.
inplace : bool, optional
If True, store results in data, else return matrices.
Defaults to True.
Returns
-------
None or np.ndarray
If inplace=True, returns None and stores results in data.
If inplace=False, returns cluster_distances.
"""
# Get data matrix
if use_rep == "X":
X = data.X
else:
X = data.obsm[use_rep]
# Calculate pairwise distances between all points
full_distances = sklearn.metrics.pairwise_distances(X)
# Get valid clusters (non-NA)
mask = data.obs[label_key].notna()
valid_clusters = data.obs[label_key][mask].unique()
# Calculate cluster distances
cluster_distances = np.zeros((len(valid_clusters), len(valid_clusters)))
cluster_indices = {
cluster: np.where(data.obs[label_key] == cluster)[0]
for cluster in valid_clusters
}
for i, cluster1 in enumerate(valid_clusters):
for j, cluster2 in enumerate(valid_clusters):
idx1 = cluster_indices[cluster1]
idx2 = cluster_indices[cluster2]
# Get submatrix of distances between clusters
submatrix = full_distances[np.ix_(idx1, idx2)]
cluster_distances[i, j] = np.mean(submatrix)
if inplace:
# Store full distances
data.obs[distance_key] = pd.Series(
np.mean(full_distances, axis=1), index=data.obs.index
)
# Store cluster distances and metadata
data.uns["cluster_distances"] = {
"distances": cluster_distances,
"clusters": valid_clusters.tolist(),
}
else:
return cluster_distances