Skip to content

Commit

Permalink
MRG: Use RankLineageInfo to simplify reading lineages (#2467)
Browse files Browse the repository at this point in the history
  • Loading branch information
bluegenes authored Feb 13, 2023
1 parent fa3ead6 commit f597fa8
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 110 deletions.
118 changes: 58 additions & 60 deletions src/sourmash/tax/tax_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class BaseLineageInfo:
optional:
lineage: tuple or list of LineagePair
lineage_str: `;`- or `,`-separated string of names
lineage_dict: dictionary of {rank: name}
If no lineage information is provided, result will be a BaseLineageInfo
with provided ranks and no lineage names.
Expand All @@ -63,7 +62,6 @@ class BaseLineageInfo:
ranks: tuple() # require ranks
lineage: tuple = None # tuple of LineagePairs
lineage_str: str = field(default=None, compare=False) # ';'- or ','-separated str of lineage names
lineage_dict: dict = field(default=None, compare=False) # dict of rank: name

def __post_init__(self):
"Initialize according to passed values"
Expand All @@ -74,8 +72,6 @@ def __post_init__(self):
self._init_from_lineage_tuples()
elif self.lineage_str is not None:
self._init_from_lineage_str()
elif self.lineage_dict is not None:
self._init_from_lineage_dict()
else:
self._init_empty()

Expand Down Expand Up @@ -170,37 +166,6 @@ def _init_from_lineage_tuples(self):
object.__setattr__(self, "lineage", tuple(new_lineage))
object.__setattr__(self, "filled_ranks", filled_ranks)

def _init_from_lineage_dict(self):
'initialize from lineage dict, e.g. from gather csv, allowing empty ranks and reordering if necessary'
if not isinstance(self.lineage_dict, (dict)):
raise ValueError(f"{self.lineage_dict} is not dictionary")
# first, initialize_empty
new_lineage = []
# build empty lineage
for rank in self.ranks:
new_lineage.append(LineagePair(rank=rank))
# now add input information in correct spots. This corrects for order and allows empty values.
for rank, info in self.lineage_dict.items():
try:
rank_idx = self.rank_index(rank)
except ValueError as e:
raise ValueError(f"Rank '{rank}' not present in {', '.join(self.ranks)}") from e

name, taxid = None, None
if isinstance(info, dict):
if 'name' in info.keys():
name = info['name']
if 'taxid' in info.keys():
taxid = info['taxid']
elif isinstance(info, str):
name = info
new_lineage[rank_idx] = LineagePair(rank=rank, name=name, taxid=taxid)
# build list of filled ranks
filled_ranks = [a.rank for a in new_lineage if a.name]
# set lineage and filled_ranks
object.__setattr__(self, "lineage", tuple(new_lineage))
object.__setattr__(self, "filled_ranks", filled_ranks)

def _init_from_lineage_str(self):
"""
Turn a ; or ,-separated set of lineages into a list of LineagePair objs.
Expand Down Expand Up @@ -329,6 +294,7 @@ class RankLineageInfo(BaseLineageInfo):
and will not be used or compared in any other class methods.
"""
ranks: tuple = ('superkingdom', 'phylum', 'class', 'order', 'family', 'genus', 'species', 'strain')
lineage_dict: dict = field(default=None, compare=False) # dict of rank: name

def __post_init__(self):
"Initialize according to passed values"
Expand All @@ -344,6 +310,53 @@ def __post_init__(self):
elif self.ranks:
self._init_empty()

def _init_from_lineage_dict(self):
"""
Initialize from lineage dict, e.g. from lineages csv.
Use NCBI taxids if available as '|'-separated 'taxpath' column.
Allows empty ranks/extra columns and reordering if necessary
"""
null_names = set(['[Blank]', 'na', 'null', 'NA', ''])
if not isinstance(self.lineage_dict, (dict)):
raise ValueError(f"{self.lineage_dict} is not dictionary")
new_lineage = []
taxpath=[]
# build empty lineage and taxpath
for rank in self.ranks:
new_lineage.append(LineagePair(rank=rank))

# check for NCBI taxpath information
taxpath_str = self.lineage_dict.get('taxpath', [])
if taxpath_str:
taxpath = taxpath_str.split('|')
if len(taxpath) > len(self.ranks):
raise ValueError(f"Number of NCBI taxids ({len(taxpath)}) exceeds number of ranks ({len(self.ranks)})")

# now add rank information in correct spots. This corrects for order and allows empty ranks and extra dict keys
for key, val in self.lineage_dict.items():
name, taxid = None, None
try:
rank, name = key, val
rank_idx = self.rank_index(rank)
except ValueError:
continue # ignore dictionary entries (columns) that don't match a rank

if taxpath:
try:
taxid = taxpath[rank_idx]
except IndexError:
taxid = None
# filter null
if name is not None and name.strip() in null_names:
name = None
new_lineage[rank_idx] = LineagePair(rank=rank, name=name, taxid=taxid)

# build list of filled ranks
filled_ranks = [a.rank for a in new_lineage if a.name]
# set lineage and filled_ranks
object.__setattr__(self, "lineage", tuple(new_lineage))
object.__setattr__(self, "filled_ranks", filled_ranks)


def get_ident(ident, *,
keep_full_identifiers=False, keep_identifier_versions=False):
Expand Down Expand Up @@ -754,15 +767,14 @@ def load(cls, filename, *, delimiter=',', force=False,
else:
header_str = ",".join([repr(x) for x in header])
raise ValueError(f'No taxonomic identifiers found; headers are {header_str}')

# is "strain" an available rank?
if "strain" in header:
include_strain=True
load_taxids=False
if 'taxpath' in header:
load_taxids=True

# check that all ranks are in header
ranks = list(lca_utils.taxlist(include_strain=include_strain))
ranks = list(RankLineageInfo().taxlist)
if not include_strain:
ranks.remove('strain')
if not set(ranks).issubset(header):
# for now, just raise err if not all ranks are present.
# in future, we can define `ranks` differently if desired
Expand All @@ -777,40 +789,26 @@ def load(cls, filename, *, delimiter=',', force=False,
# now parse and load lineages
for n, row in enumerate(r):
num_rows += 1
lineage = []
taxid=None
# read row into a lineage pair
if load_taxids:
taxpath = row['taxpath'].split('|')
for n, rank in enumerate(lca_utils.taxlist(include_strain=include_strain)):
lin = row[rank]
if load_taxids:
taxid = taxpath[n]
lineage.append(LineagePair(rank, name=lin, taxid=taxid))
# read lineage from row dictionary
lineageInfo = RankLineageInfo(lineage_dict=row)
# get identifier
ident = row[identifier]

# fold, spindle, and mutilate ident?
ident = get_ident(ident,
keep_full_identifiers=keep_full_identifiers,
keep_identifier_versions=keep_identifier_versions)

# clean lineage of null names, replace with 'unassigned'
lineage = [ (lin.rank, lca_utils.filter_null(lin.name), lin.taxid) for lin in lineage ]
lineage = [ LineagePair(a, b, c) for (a, b, c) in lineage ]

# remove end nulls
while lineage and lineage[-1].name == 'unassigned':
lineage = lineage[:-1]

# store lineage tuple
lineage = lineageInfo.filled_lineage
if lineage:
# check duplicates
if ident in assignments:
if assignments[ident] != tuple(lineage):
if assignments[ident] != lineage:
if not force:
raise ValueError(f"multiple lineages for identifier {ident}")
else:
assignments[ident] = tuple(lineage)
assignments[ident] = lineage

if lineage[-1].rank == 'species':
n_species += 1
Expand Down
126 changes: 76 additions & 50 deletions tests/test_tax_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1074,46 +1074,6 @@ def test_BaseLineageInfo_init_lca_lineage_tups():
assert taxinf.zip_lineage()== ['a', '', 'b']


def test_BaseLineageInfo_init_lineage_dict_fail():
ranks=["A", "B", "C"]
lin_tups = (LineagePair(rank="A", name='a'), LineagePair(rank="C", name='b'))
with pytest.raises(ValueError) as exc:
taxinf = BaseLineageInfo(ranks=ranks, lineage_dict=lin_tups)
print(str(exc))

assert "is not dictionary" in str(exc)


def test_BaseLineageInfo_init_lineage_dict ():
x = {'rank1': 'name1', 'rank2': 'name2'}
taxinf = BaseLineageInfo(lineage_dict=x, ranks=["rank1", "rank2"])
print("ranks: ", taxinf.ranks)
print("lineage: ", taxinf.lineage)
print("zipped lineage: ", taxinf.zip_lineage())
assert taxinf.zip_lineage()== ['name1', 'name2']


def test_BaseLineageInfo_init_lineage_dict_withtaxid():
x = {'rank1': {'name': 'name1', 'taxid': 1}, 'rank2': {'name':'name2', 'taxid': 2}}
taxinf = BaseLineageInfo(lineage_dict=x, ranks=["rank1", "rank2"])
print("ranks: ", taxinf.ranks)
print("lineage: ", taxinf.lineage)
print("zipped lineage: ", taxinf.zip_lineage())
assert taxinf.zip_lineage()== ['name1', 'name2']
assert taxinf.zip_taxid()== ['1', '2']
assert taxinf.lowest_lineage_taxid == 2
assert taxinf.lowest_lineage_name == "name2"


def test_BaseLineageInfo_init_lineage_str_lineage_dict_test_eq():
x = "a;b;c"
ranks=["A", "B", "C"]
rankD = {"A": "a", "B": "b", "C": "c"}
lin1 = BaseLineageInfo(lineage_str=x, ranks=ranks)
lin2 = BaseLineageInfo(lineage_dict=rankD, ranks=ranks)
assert lin1 == lin2


def test_BaseLineageInfo_init_no_ranks():
x = "a;b;c"
rankD = {"superkingdom": "a", "phylum": "b", "class": "c"}
Expand All @@ -1122,10 +1082,6 @@ def test_BaseLineageInfo_init_no_ranks():
BaseLineageInfo(lineage_str=x)
print(exc)
assert "__init__() missing 1 required positional argument: 'ranks'" in str(exc)
with pytest.raises(TypeError) as exc:
BaseLineageInfo(lineage_dict=rankD)
print(exc)
assert "__init__() missing 1 required positional argument: 'ranks'" in str(exc)
with pytest.raises(TypeError) as exc:
BaseLineageInfo(lineage=lin_tups)
print(exc)
Expand All @@ -1140,10 +1096,6 @@ def test_BaseLineageInfo_init_with_wrong_ranks():
BaseLineageInfo(lineage=lin_tups, ranks=ranks)
print(str(exc))
assert "Rank 'rank1' not present in A, B, C" in str(exc)
with pytest.raises(ValueError) as exc:
BaseLineageInfo(lineage_dict=linD, ranks=ranks)
print(str(exc))
assert "Rank 'rank1' not present in A, B, C" in str(exc)


def test_BaseLineageInfo_init_not_lineagepair():
Expand Down Expand Up @@ -1187,14 +1139,55 @@ def test_RankLineageInfo_init_lineage_tups():
assert taxinf.zip_lineage()== ['a', 'b', '', '', '', '', '', '']


def test_RankLineageInfo_init_lineage_dict_fail():
ranks=["A", "B", "C"]
lin_tups = (LineagePair(rank="A", name='a'), LineagePair(rank="C", name='b'))
with pytest.raises(ValueError) as exc:
taxinf = RankLineageInfo(ranks=ranks, lineage_dict=lin_tups)
print(str(exc))

assert "is not dictionary" in str(exc)


def test_RankLineageInfo_init_lineage_dict():
x = {'rank1': 'name1', 'rank2': 'name2'}
taxinf = RankLineageInfo(lineage_dict=x, ranks=["rank1", "rank2"])
print("ranks: ", taxinf.ranks)
print("lineage: ", taxinf.lineage)
print("zipped lineage: ", taxinf.zip_lineage())
assert taxinf.zip_lineage()== ['name1', 'name2']


def test_RankLineageInfo_init_lineage_dict_default_ranks():
x = {"superkingdom":'a',"phylum":'b'}
taxinf = RankLineageInfo(lineage_dict=x)
print(taxinf.lineage)
print(taxinf.lineage_str)
assert taxinf.zip_lineage()== ['a', 'b', '', '', '', '', '', '']


def test_RankLineageInfo_init_lineage_dict_withtaxpath():
x = {'rank1': 'name1', 'rank2': 'name2', 'taxpath': "1|2"}
taxinf = RankLineageInfo(lineage_dict=x, ranks=["rank1", "rank2"])
print("ranks: ", taxinf.ranks)
print("lineage: ", taxinf.lineage)
print("zipped lineage: ", taxinf.zip_lineage())
print("zipped taxids: ", taxinf.zip_taxid())
assert taxinf.zip_lineage()== ['name1', 'name2']
assert taxinf.zip_taxid()== ['1', '2']
assert taxinf.lowest_lineage_taxid == "2"
assert taxinf.lowest_lineage_name == "name2"


def test_RankLineageInfo_init_lineage_str_lineage_dict_test_eq():
x = "a;b;c"
ranks=["A", "B", "C"]
rankD = {"A": "a", "B": "b", "C": "c"}
lin1 = RankLineageInfo(lineage_str=x, ranks=ranks)
lin2 = RankLineageInfo(lineage_dict=rankD, ranks=ranks)
assert lin1 == lin2


def test_RankLineageInfo_init_lineage_dict_missing_rank():
x = {'superkingdom': 'name1', 'class': 'name2'}
taxinf = RankLineageInfo(lineage_dict=x)
Expand All @@ -1205,8 +1198,8 @@ def test_RankLineageInfo_init_lineage_dict_missing_rank():
assert taxinf.zip_lineage(truncate_empty=True)== ['name1', '', 'name2']


def test_RankLineageInfo_init_lineage_dict_missing_rank_withtaxid():
x = {'superkingdom': {'name': 'name1', 'taxid': 1}, 'class': {'name':'name2', 'taxid': 2}}
def test_RankLineageInfo_init_lineage_dict_missing_rank_with_taxpath():
x = {'superkingdom': 'name1', 'class': 'name2', 'taxpath': '1||2'}
taxinf = RankLineageInfo(lineage_dict=x)
print("ranks: ", taxinf.ranks)
print("lineage: ", taxinf.lineage)
Expand All @@ -1215,6 +1208,39 @@ def test_RankLineageInfo_init_lineage_dict_missing_rank_withtaxid():
assert taxinf.zip_taxid()== ['1', '', '2', '', '', '', '', '']


def test_RankLineageInfo_init_lineage_dict_name_taxpath_mismatch():
# If there's no name, we don't report the taxpath, because lineage is not "filled".
# Is this desired behavior?
x = {'superkingdom': 'name1', 'taxpath': '1||2'}
taxinf = RankLineageInfo(lineage_dict=x)
print("ranks: ", taxinf.ranks)
print("lineage: ", taxinf.lineage)
print("zipped lineage: ", taxinf.zip_lineage())
assert taxinf.zip_lineage()== ['name1', '', '', '', '', '', '', '']
assert taxinf.zip_taxid()== ['1', '', '', '', '', '', '', '']


def test_RankLineageInfo_init_lineage_dict_name_taxpath_missing_taxids():
# If there's no name, we don't report the taxpath, because lineage is not "filled".
# Is this desired behavior?
x = {'superkingdom': 'name1', 'phylum': "name2", "class": "name3", 'taxpath': '|2'}
taxinf = RankLineageInfo(lineage_dict=x)
print("ranks: ", taxinf.ranks)
print("lineage: ", taxinf.lineage)
print("zipped lineage: ", taxinf.zip_lineage())
print("zipped taxids: ", taxinf.zip_taxid())
assert taxinf.zip_lineage()== ['name1', 'name2', 'name3', '', '', '', '', '']
assert taxinf.zip_taxid()== ['', '2', '', '', '', '', '', '']


def test_RankLineageInfo_init_lineage_dict_taxpath_too_long():
x = {'superkingdom': 'name1', 'class': 'name2', 'taxpath': '1||2||||||||||'}
with pytest.raises(ValueError) as exc:
RankLineageInfo(lineage_dict=x)
print(str(exc))
assert f"Number of NCBI taxids (13) exceeds number of ranks (8)" in str(exc)


def test_RankLineageInfo_init_lineage_str_lineage_dict_test_eq():
x = "a;b;c"
rankD = {"superkingdom": "a", "phylum": "b", "class": "c"}
Expand Down

0 comments on commit f597fa8

Please sign in to comment.