|
| 1 | +from pathlib import Path |
| 2 | + |
| 3 | +import pandas as pd |
| 4 | +from anndata import AnnData |
| 5 | + |
| 6 | + |
| 7 | +def row_annotations( |
| 8 | + adata: AnnData, |
| 9 | + annotations: pd.DataFrame | str | Path, |
| 10 | + join_column: str | None = None, |
| 11 | + columns: list[str] | None = None, |
| 12 | + copy: bool = False, |
| 13 | +) -> AnnData | None: |
| 14 | + """\ |
| 15 | + Add annotations to adata.obs by joining on cell/spot identifiers. |
| 16 | +
|
| 17 | + Merges a DataFrame (or CSV file) into adata.obs based on a |
| 18 | + shared index or column. Useful for adding metadata such as |
| 19 | + manual labels, clinical annotations, or external classifications. |
| 20 | +
|
| 21 | + Parameters |
| 22 | + ---------- |
| 23 | + adata |
| 24 | + Annotated data matrix. |
| 25 | + annotations |
| 26 | + DataFrame or path to a CSV/TSV file containing annotations. |
| 27 | + join_column |
| 28 | + Column in annotations to join on. If None, uses the |
| 29 | + DataFrame index. The join is always against adata.obs_names. |
| 30 | + columns |
| 31 | + Subset of columns to add. If None, adds all columns |
| 32 | + (excluding join_column). |
| 33 | + copy |
| 34 | + Return a copy instead of writing to adata. |
| 35 | +
|
| 36 | + Returns |
| 37 | + ------- |
| 38 | + Depending on `copy`, returns or updates `adata` with new |
| 39 | + columns added to `adata.obs`. |
| 40 | + """ |
| 41 | + adata = adata.copy() if copy else adata |
| 42 | + |
| 43 | + if isinstance(annotations, (str, Path)): |
| 44 | + path = Path(annotations) |
| 45 | + sep = "\t" if path.suffix in (".tsv", ".txt") else "," |
| 46 | + annotations = pd.read_csv(path, sep=sep) |
| 47 | + |
| 48 | + if join_column is not None: |
| 49 | + if join_column not in annotations.columns: |
| 50 | + raise ValueError( |
| 51 | + f"Column '{join_column}' not found. " |
| 52 | + f"Available: {list(annotations.columns)}" |
| 53 | + ) |
| 54 | + annotations = annotations.set_index(join_column) |
| 55 | + |
| 56 | + if columns is not None: |
| 57 | + missing = [c for c in columns if c not in annotations.columns] |
| 58 | + if missing: |
| 59 | + raise ValueError(f"Columns not found: {missing}") |
| 60 | + annotations = annotations[columns] |
| 61 | + |
| 62 | + merged = annotations.reindex(adata.obs_names) |
| 63 | + |
| 64 | + n_matched = merged.notna().any(axis=1).sum() |
| 65 | + added_cols = list(merged.columns) |
| 66 | + |
| 67 | + for col in added_cols: |
| 68 | + adata.obs[col] = merged[col].values |
| 69 | + |
| 70 | + print( |
| 71 | + f"Added {len(added_cols)} column(s) to adata.obs: {added_cols}. " |
| 72 | + f"{n_matched}/{adata.n_obs} cells/spots matched." |
| 73 | + ) |
| 74 | + |
| 75 | + return adata if copy else None |
0 commit comments