Source code for micone.main.lineage

"""
    Module that implements the `Lineage` class and  methods to work with taxonomy data
"""


from collections import namedtuple
from typing import Dict, Tuple
from warnings import warn

from ete3 import NCBITaxa

from ..logging import LOG

BaseLineage = namedtuple("Lineage", "Kingdom Phylum Class Order Family Genus Species")


[docs]class Lineage(BaseLineage): """ `NamedTuple` that stores the lineage of a taxon and methods to interact with it Attributes ---------- Kingdom: str Phylum: str Class: str Order: str Family: str Genus: str Species: str """ def __new__( cls, Kingdom: str = "", Phylum: str = "", Class: str = "", Order: str = "", Family: str = "", Genus: str = "", Species: str = "", ) -> "Lineage": tax_order = [Kingdom, Phylum, Class, Order, Family, Genus, Species] empty = [i for i, tax in enumerate(tax_order) if tax == ""] if empty and (len(tax_order) - empty[0] != len(empty)): warn( RuntimeWarning( f"Lower levels should not be filled if higher levels are empty: {tax_order}" ) ) norm_taxa = [cls._normalize_tax(i) for i in tax_order] cls._ncbi = NCBITaxa() return super().__new__(cls, *norm_taxa) @staticmethod def _normalize_tax(tax: str) -> str: """ Normalize taxonomy name by removing unwanted characters Parameters ---------- tax : str Returns ------- str Normalized taxonomy name """ return ( tax.strip() .replace("[", "") .replace("]", "") .replace("'", "") .replace("=", "") ) def __sub__(self, other: "Lineage") -> "Lineage": """ Returns the lineage that is in common between two lineages Parameters ---------- other : "Lineage" Returns ------- Lineage Common lineage """ for i, (s_lin, o_lin) in enumerate(zip(self, other)): if s_lin != o_lin: return Lineage(*self[:i]) return Lineage(*self._fields) @property def name(self) -> Tuple[str, str]: """ Get the lowest populated level and name of the taxon Returns ------- Tuple[str, str] Tuple containing (level, name) """ fields = self._fields for field in reversed(fields): ind = fields.index(field) name = self[ind] if name != "": return field, name return "Kingdom", "unclassified"
[docs] @classmethod def from_str(cls, lineage_str: str, style: str = "gg") -> "Lineage": """ Create `Lineage` instance from a lineage string Parameters ---------- lineage_str : str Lineage in the form of a string style : {'gg', 'silva'}, optional The style of the lineage string Default is 'gg' Returns ------- Lineage Instance of the `Lineage` class """ if style == "gg": if lineage_str.startswith("k"): tax_list = lineage_str.split(";") elif lineage_str.startswith("p"): tax_list = ["Bacteria"] + lineage_str.split(";") else: raise ValueError("Incompatible lineage string") elif style == "silva": if lineage_str.startswith("D_0"): tax_list = lineage_str.split(";D_7")[0].split(";") elif lineage_str.startswith("D_1"): tax_list = ["Bacteria"] + lineage_str.split(";D_7")[0].split(";") else: raise ValueError("Incompatible lineage string") else: raise ValueError("Style has to be either 'gg' or 'silva'") taxa = [l.strip().rsplit("__", 1)[-1] for l in tax_list] return cls(*taxa)
[docs] def to_str(self, style: str, level: str) -> str: """ Return the string Lineage of the instance in requested 'style' Parameters ---------- style : {'gg', 'silva'} The style of the lineage string level : str The lowest Lineage field that is to be populated Returns ------- str """ if level not in self._fields: raise ValueError(f"{level} not a valid field for Lineage") else: ind = self._fields.index(level) fields = self._fields[: ind + 1] data = self[: ind + 1] if style == "gg": prefix = [f.lower()[0] for f in fields] elif style == "silva": prefix = [f"D_{i}" for i in range(len(fields))] else: raise ValueError("Style needs to be either 'gg' or 'silva'") return ";".join(f"{p}__{v}" for p, v in zip(prefix, data))
def __str__(self) -> str: """ Get the lineage in the form of a string Returns ------- str The lineage string in 'gg' format """ return self.to_str(style="gg", level="Species")
[docs] def to_dict(self, level: str) -> Dict[str, str]: """ Get the lineage in the form of a dictionary Parameters ---------- level : str The lowest Lineage field to be used to populate the dictionary """ if level not in self._fields: raise ValueError(f"{level} not a valid field for Lineage") ind = self._fields.index(level) fields = self._fields[: ind + 1] return {field: tax for field, tax in zip(fields, self)}
[docs] def get_superset(self, level: str) -> "Lineage": """ Return a superset of the current lineage for the requested level Parameters ---------- level : str The lowest Lineage field to be used to calculate the superset Returns ------- Lineage Lineage instance that is a superset of current instance """ if level not in self._fields: raise ValueError(f"{level} not a valid field for Lineage") ind = self._fields.index(level) tax = self[: ind + 1] return Lineage(*tax)
@property def taxid(self) -> Tuple[str, int]: """ Get the NCBI taxonomy id of the Lineage Returns ------- Tuple[str, int] A tuple containing (taxonomy level, NCBI taxonomy id) """ query = list(self) # species or subspecies level query.append(query[-2] + " " + query[-1].strip()) # species level query[-2] = query[-3] + " " + query[-2].split(" ")[0].strip() taxid_dict = self._ncbi.get_name_translator(query) taxid_list = [12908] for taxa in reversed(query): if taxa != "" and taxa in taxid_dict: taxid_list = taxid_dict[taxa] break name = [q for q in reversed(query) if q != ""] if taxa != name[0] and taxa != name[1]: warning_msg = ( f"Lowest level in {self} could not be queried. Using higher level" ) LOG.logger.warning(warning_msg) warn(RuntimeWarning(warning_msg)) if len(taxid_list) > 1: warning_msg = f"{self.name} has multiple taxids. Picking the first one" LOG.logger.warning(warning_msg) warn(RuntimeWarning(warning_msg)) taxid = taxid_list[0] rank = self._fields[min(query.index(taxa), len(self._fields) - 1)] return rank, taxid
[docs] @classmethod def from_taxid(cls, taxid: int) -> "Lineage": """ Create `Lineage` instance from taxid Parameters ---------- taxid : int A valid NCBI taxonomy id Returns ------- "Lineage" Instance of the `Lineage` class """ ncbi = NCBITaxa() lineage_taxids = ncbi.get_lineage(taxid) lineage_names = ncbi.get_taxid_translator(lineage_taxids) lineage_ranks = { v.capitalize(): k for k, v in ncbi.get_rank(lineage_taxids).items() } if "Superkingdom" in lineage_ranks: lineage_ranks["Kingdom"] = lineage_ranks["Superkingdom"] del lineage_ranks["Superkingdom"] taxa: Dict[str, str] = {} for field in cls._fields: if field in lineage_ranks: taxa[field] = lineage_names[lineage_ranks[field]] else: break return cls(**taxa)