"""
Module that defines the `Otu` objects and methods to manipulate it
"""
import json
import pathlib
from typing import Callable, Dict, Hashable, Iterable, List, Optional, Tuple
import numpy as np
import pandas as pd
from biom import Table
from biom.util import biom_open
from ..validation import BiomType, ObsmetaType, OtuValidator, SamplemetaType
from .lineage import Lineage
Filterfun = Callable[[np.ndarray, str, dict], bool]
Hashfun = Callable[[str, dict], Hashable]
[docs]class Otu:
"""
An object that represents the OTU counts table
Parameters
----------
otu_data : Table
`biom.Table` object containing OTU data
sample_metadata : pd.DataFrame, optional
`pd.DataFrame` containing metadata for the samples
obs_metadata : pd.DataFrame, optional
`pd.DataFrame` containing metadata for the observations (OTUs)
Attributes
----------
otu_data : biom.Table
OTU counts table in the `biom.Table` format
sample_metadata : pd.DataFrame
Metadata for the samples
obs_metadata : pd.DataFrame
Lineage data for the observations (OTUs)
tax_level : str
The taxonomy level of the current Otu instance
Notes
-----
All methods that manipulate the Otu object return new objects
"""
def __init__(
self,
otu_data: Table,
sample_metadata: Optional[pd.DataFrame] = None,
obs_metadata: Optional[pd.DataFrame] = None,
) -> None:
if not isinstance(otu_data, Table):
raise TypeError("Otu data must be of type `biom.Table`")
otu_data_copy = otu_data.copy()
if sample_metadata is not None:
samplemeta_type = SamplemetaType()
samplemeta_type.validate(sample_metadata)
otu_data_copy.add_metadata(
sample_metadata.to_dict(orient="index"), axis="sample"
)
if obs_metadata is not None:
obsmeta_type = ObsmetaType()
obsmeta_type.validate(obs_metadata)
otu_data_copy.add_metadata(
obs_metadata.to_dict(orient="index"), axis="observation"
)
biom_type = BiomType()
biom_type.validate(otu_data_copy)
self.otu_data = otu_data_copy
def __repr__(self) -> str:
n_obs, n_samples = self.otu_data.shape
return f"<Otu {n_obs}obs x {n_samples}samples>"
[docs] @classmethod
def load_data(
cls,
otu_file: str,
meta_file: Optional[str] = None,
tax_file: Optional[str] = None,
dtype: str = "biom",
ext: Optional[str] = None,
) -> "Otu":
"""
Load data from files into the `Otu` class instance
Parameters
----------
otu_file : str
The path to the `OTU` counts file
meta_file : str, optional
The path to the sample metadata file
tax_file : str, optional
The path to the taxonomy file
dtype : {'biom', 'tsv'}
The type of OTU file that is input
ext : str, optional
The extension of the file if other than supported extensions
Supported extensions:
- 'tsv' dtype: 'tsv', 'txt', 'counts'
- 'biom' dtype: 'biom', 'hdf5'
Returns
-------
Otu
An instance of the `Otu` class
"""
otu_validator = OtuValidator(dtype=dtype, ext=ext)
otu_path = pathlib.Path(otu_file)
if otu_path.exists():
meta_path = pathlib.Path(meta_file) if meta_file is not None else meta_file
tax_path = pathlib.Path(tax_file) if tax_file is not None else tax_file
otu_data = otu_validator.load_validate(otu_path, meta_path, tax_path)
else:
raise FileNotFoundError("Missing input files")
return cls(otu_data)
@property
def sample_metadata(self) -> pd.DataFrame:
"""
Metadata for the samples
Returns
-------
pd.DataFrame
"""
return self.otu_data.metadata_to_dataframe("sample")
@property
def obs_metadata(self) -> pd.DataFrame:
"""Lineage data for the observations (OTUs)"""
obs_metadata = self.otu_data.metadata_to_dataframe("observation")
lineage = list(Lineage._fields)
n_tax_levels = len(set(obs_metadata.columns) & set(lineage))
lineage_columns = lineage[:n_tax_levels]
if extra_columns := list(set(obs_metadata.columns) - set(lineage_columns)):
columns: List[str] = lineage_columns + extra_columns
else:
columns = lineage_columns
return obs_metadata[columns]
@property
def tax_level(self) -> str:
"""
Returns the taxonomy level of the Otu instance
Returns
-------
str
The lowest taxonomy defined in the Otu instance
"""
obs_metadata = self.otu_data.metadata_to_dataframe("observation")
lineage = list(Lineage._fields)
n_tax_levels = len(set(obs_metadata.columns) & set(lineage))
return Lineage._fields[n_tax_levels - 1]
[docs] def filter(
self,
ids: Optional[Iterable[str]] = None,
func: Optional[Filterfun] = None,
axis: str = "observation",
) -> "Otu":
"""
Filter Otu instance based on ids or func
Parameters
----------
ids : Iterable[str], optional
An iterable of ids to keep.
If ids are not supplied then func must be supplied
func : Callable[[np.ndarray, str, dict], bool], optional
A function that takes in (values, id_ind, md) and returns a bool
If func is not supplied then ids must be supplied
If both ids and func are supplied then ids are used
axis : {'sample', 'observation'}, optional
The axis along which to filter the Otu instance
Default value is 'observation'
Returns
-------
Otu
Filtered Otu instance
"""
if ids is not None:
otu_filtered = self.otu_data.filter(ids, inplace=False, axis=axis)
elif func:
otu_filtered = self.otu_data.filter(func, inplace=False, axis=axis)
else:
raise TypeError("Either ids or func must be supplied")
return Otu(otu_filtered)
[docs] def normalize(self, axis: str = "sample", method: str = "norm") -> "Otu":
"""
Normalize the OTU table along the provided axis
Parameters
----------
axis : {'sample', 'observation'}, optional
Axis along which to normalize the OTU table
Default is 'sample'
method: {'norm', 'rarefy', 'css'}
Normalization method to use
Returns
-------
Otu
Otu instance which is normalized along the given axis
"""
if method == "norm":
norm_otu = self.otu_data.norm(axis=axis, inplace=False)
elif method in {"rarefy", "css"}:
raise NotImplementedError("This method is not implemented yet")
else:
raise ValueError(
"Invalid method. Supported methods are {'norm', 'rarefy', 'css'}"
)
return Otu(norm_otu)
[docs] def is_norm(self, axis: str = "sample") -> bool:
"""
Returns true if the Otu instance has been normalized
"""
otu_data = self.otu_data.to_dataframe()
if axis == "sample":
return bool(np.isclose(otu_data.sum(axis=0), 1.0).all())
if axis == "observation":
return bool(np.isclose(otu_data.sum(axis=1), 1.0).all())
raise ValueError("Axis must of either {'sample' or 'observation'}")
[docs] def rm_sparse_samples(self, count_thres: int = 500) -> "Otu":
"""
Remove samples with read counts less than `count_thres`
Parameters
----------
count_thres : int, optional
Counts threshold below which samples are rejected
Default value is 500
Returns
-------
Otu
Otu instance with low count samples removed
Raises
------
ValueError
If Otu instance is normalized
"""
if self.is_norm():
raise ValueError(
"Otu instance is normalized and hence will not work with this method"
)
filt_fun = lambda val, *_: round(val.sum()) >= count_thres
new_otu = self.otu_data.filter(filt_fun, axis="sample", inplace=False)
return Otu(new_otu)
[docs] def rm_sparse_obs(
self,
prevalence_thres: float = 0.05,
abundance_thres: float = 0.01,
obssum_thres: int = 100,
) -> "Otu":
"""
Remove observations with prevalence < `prevalence_thres` and abundance < `abundance_thres`
Parameters
----------
prevalence_thres : float
Minimum fraction of samples the observation must be present in in order to be accepted
abundance_thres : float
Minimum observation count fraction in a sample needed in order to be accepted
obssum_thres : int
The theshold applied to the sum of observations for each row
Returns
-------
Otu
Otu instance with bad observations removed
"""
prevalence_obssum_filter = lambda val, *_: (
(val.astype(int).astype(bool).mean()) >= prevalence_thres
) and ((val.astype(int).sum()) >= obssum_thres)
otu_prevalence_obssum_filtered = self.otu_data.filter(
prevalence_obssum_filter, axis="observation", inplace=False
)
otu_df = otu_prevalence_obssum_filtered.to_dataframe()
if otu_df.apply(pd.api.types.is_sparse).any():
otu_rel_abund = (otu_df / otu_df.sum(axis=0)).sparse.to_dense()
else:
otu_rel_abund = otu_df / otu_df.sum(axis=0)
ind_above_thres = otu_rel_abund.index[
(otu_rel_abund > abundance_thres).any(axis=1)
]
new_otu = self.otu_data.filter(
ind_above_thres, axis="observation", inplace=False
)
ind_below_thres = set(self.otu_data.ids("observation")) - set(ind_above_thres)
otu_sparse_obs = self.otu_data.filter(
ind_below_thres, axis="observation", inplace=False
)
new_row = Table(
otu_sparse_obs.sum(axis="sample"),
["otu_merged"],
self.otu_data.ids(axis="sample"),
)
tax_level = self.tax_level
random_row_metadata = dict(self.otu_data.metadata(axis="observation")[0]) # type: ignore
new_row.add_metadata(
{
"otu_merged": {
**random_row_metadata,
**Lineage("Unclassified").to_dict(tax_level),
}
},
axis="observation",
)
final_otu = new_otu.concat([new_row], axis="observation")
return Otu(final_otu)
[docs] def partition(self, axis: str, func: Hashfun) -> Iterable[Tuple[str, "Otu"]]:
"""
Partition the Otu instance based on the func and axis
Parameters
----------
axis : str
The axis on which to partition
func : Callable[[str, dict], Hashable]
The function that takes in (id, metadata) and returns a hashable
Returns
-------
Iterable[Tuple[str, Otu]]
An iterable of tuples - ('label', Otu)
Notes
-----
1. To group by lineage "level" use:
func = lambda id_ind, md: Lineage(**md).get_superset(level)
"""
if axis == "observation" and self.is_norm(axis="sample"):
raise ValueError(
"Cannot partition sample normalized Otu instance on observation"
)
if axis == "sample" and self.is_norm(axis="observation"):
raise ValueError(
"Cannot partition observation normalized Otu instance on sample"
)
partitions = self.otu_data.partition(func, axis=axis)
for label, table in partitions:
yield label, Otu(table)
[docs] def collapse_taxa(self, level: str) -> Tuple["Otu", Dict[str, List[str]]]:
"""
Collapse Otu instance based on taxa
Parameters
----------
level : str
The tax level of the collapsed table
This will also be used as the prefix for the unique ids
Returns
-------
Tuple[Otu, dict]
Collapsed Otu instance
"""
obs_metadata_cols = list(self.obs_metadata)
lineage_cols = list(Lineage._fields)
unwanted_obs_metadata_cols = list(set(obs_metadata_cols) - set(lineage_cols))
otu_data_copy = self.otu_data.copy()
otu_data_copy.del_metadata(keys=unwanted_obs_metadata_cols, axis="observation")
obs_metadata_copy = self.obs_metadata.drop(unwanted_obs_metadata_cols, axis=1)
if level not in Lineage._fields:
raise ValueError(f"level must be one of {Lineage._fields}")
cfunc = lambda id_ind, md: str(Lineage(**md).get_superset(level))
otu_collapse = otu_data_copy.collapse(
cfunc, axis="observation", norm=False, include_collapsed_metadata=True
)
curr_ids = otu_collapse.ids(axis="observation")
if otu_collapse_md := otu_collapse.metadata(axis="observation"):
children_group_list = [i["collapsed_ids"] for i in otu_collapse_md] # type: ignore
else:
raise ValueError("No metadata found in collapsed table")
children_groups = dict(zip(curr_ids, children_group_list))
otu_collapse.del_metadata(axis="observation")
afunc = lambda x: pd.Series(
Lineage(**x.to_dict()).get_superset(level).to_dict(level)
)
obs_collapse = obs_metadata_copy.apply(afunc, axis=1)
gfunc = lambda x: str(Lineage(**x.to_dict()))
obs_collapse.index = obs_collapse.apply(gfunc, axis=1) # type: ignore
obs_collapse.drop_duplicates(inplace=True)
otu_collapse.add_metadata(
obs_collapse.to_dict(orient="index"), axis="observation"
)
obs_dict = json.loads(
otu_collapse.metadata_to_dataframe("observation").to_json(orient="split")
)
unq_id_indmap = {k: f"{level}_{i}" for i, k in enumerate(curr_ids)}
obs_ids = [
unq_id_indmap[i] for i in obs_dict["index"]
] # get ordered unique ids
children_dict = {unq_id_indmap[k]: v for k, v in children_groups.items()}
sample_ids = otu_collapse.ids(axis="sample")
observation_metadata = otu_collapse.metadata_to_dataframe(
axis="observation"
).to_dict(orient="records")
sample_metadata = otu_collapse.metadata_to_dataframe(axis="sample").to_dict(
orient="records"
)
new_table = Table(
otu_collapse.matrix_data,
obs_ids,
sample_ids,
observation_metadata=observation_metadata,
sample_metadata=sample_metadata,
)
return Otu(new_table), children_dict
[docs] def write(
self, base_name: str, fol_path: str = "", file_type: str = "biom"
) -> None:
"""
Write Otu instance object to required file_type
Parameters
----------
base_name : str
The base name without extension to be used for the files
fol_path : str, optional
The folder where the files are to be written
Default is current directory
file_type : {'tsv', 'biom'}, optional
The type of file data is to be written to
Default is 'biom'
"""
folder = pathlib.Path(fol_path)
if not folder.exists():
folder.mkdir()
if file_type == "biom":
fname = base_name + ".biom"
fpath = str(folder / fname)
with biom_open(fpath, "w") as fid:
self.otu_data.to_hdf5(fid, "Constructed using micone")
elif file_type == "tsv":
otu_name = base_name + "_otu.tsv"
otu_path = folder / otu_name
with open(otu_path, "w") as fid:
# NOTE: delete header comment from output
data = self.otu_data.to_tsv().split("\n", 1)[-1]
fid.write(data)
sample_metadata_name = base_name + "_sample_metadata.tsv"
sample_metadata_path = folder / sample_metadata_name
self.sample_metadata.to_csv(sample_metadata_path, sep="\t", index=True)
obs_metadata_name = base_name + "_obs_metadata.csv"
obs_metadata_path = folder / obs_metadata_name
self.obs_metadata.to_csv(obs_metadata_path, index=True)
else:
raise ValueError("Supported file types are 'tsv' and 'biom'")