Source code for physcraper.ncbi_data_parser

"""uses ncbi databases to easily retrieve taxonomic information.

parts are altered from https://github.com/zyxue/ncbitax2lin/blob/master/ncbitax2lin.py
"""

import os
import sys
import pandas as pd
from physcraper.helpers import debug



_DEBUG = 0


[docs]def get_acc_from_blast(query_string): """ Get the accession number from a blast query. :param query_string: string that contains acc and gi from local blast query result :return: gb_acc """ if len(query_string.split("|")) >= 3: gb_acc = query_string.split("|")[3] else: gb_acc = query_string.split("|")[0] if len(gb_acc.split(".")) < 2: sys.stderr.write("query string {} does not contain a Genbank accession number. Skipping\n".format(query_string)) return None assert len(gb_acc.split(".")) >= 2, (len(gb_acc.split(".")), gb_acc) return gb_acc
[docs]def get_gi_from_blast(query_string): """ Get the gi number from a blast query. Get acc is more difficult now, as new seqs not always have gi number, then query changes. If not available return None. :param query_string: string that contains acc and gi from local blast query result :return: gb_id if available """ if len(query_string.split("|")) >= 3: gb_gi = query_string.split("|")[1] assert len(gb_gi.split(".")) < 2, (len(gb_gi.split(".")), gb_gi) assert gb_gi.isdigit() is True return gb_gi else: return None
[docs]def get_tax_info_from_acc(gb_id, ids_obj): '''takes an accession number and returns the ncbi_id and the taxon name''' # debug("Getting tax info from acc {}".format(gb_id)) ncbi_id = ids_obj.get_ncbiid_from_acc(gb_id) tax_name = ids_obj.ncbiid_to_spn.get(ncbi_id) if ncbi_id is None: sys.stderr.write("Failed to get information for sequence with accession number {}\n".format(gb_id)) return ncbi_id, tax_name
[docs]def get_ncbi_tax_id(handle): """Get the taxon ID from ncbi. ONly used for web queries :param handle: NCBI read.handle :return: ncbi_id """ ncbi_id = None gb_list = handle[0]["GBSeq_feature-table"][0]["GBFeature_quals"] for item in gb_list: if item[u"GBQualifier_name"] == "db_xref": if item[u"GBQualifier_value"][:5] == "taxon": ncbi_id = int(item[u"GBQualifier_value"][6:]) break else: continue return ncbi_id
[docs]def get_ncbi_tax_name(handle): """Get the sp name from ncbi. Could be replaced by direct lookup to ott_ncbi. :param handle: NCBI read.handle :return: ncbi_spn """ ncbi_sp = None gb_list = handle[0]["GBSeq_feature-table"][0]["GBFeature_quals"] for item in gb_list: if item[u"GBQualifier_name"] == "organism": ncbi_sp = str(item[u"GBQualifier_value"]) ncbi_sp = ncbi_sp.replace(" ", "_") return ncbi_sp
[docs]def strip(inputstr): """ Strips of blank characters from string in pd dataframe. """ if isinstance(inputstr, str): return inputstr.strip() else: return inputstr
[docs]def load_nodes(nodes_file): """ Loads nodes.dmp and converts it into a pandas.DataFrame. Contains the information about the taxonomic hierarchy of names. """ # print(nodes_file) assert os.path.exists(nodes_file), ( "file `%s` does not exist. Make sure you downloaded the " "databases from ncbi." % nodes_file ) df = pd.read_csv( nodes_file, sep="|", header=None, index_col=False, names=[ "tax_id", "parent_tax_id", "rank", "embl_code", "division_id", "inherited_div_flag", "genetic_code_id", "inherited_GC__flag", "mitochondrial_genetic_code_id", "inherited_MGC_flag", "GenBank_hidden_flag", "hidden_subtree_root_flag", "comments", ], ) # To get rid of flanking tab characters df["rank"] = df["rank"].apply(strip) df["embl_code"] = df["embl_code"].apply(strip) df["comments"] = df["comments"].apply(strip) return df
[docs]def load_names(names_file): """ Loads names.dmp and converts it into a pandas.DataFrame. Includes only names which are accepted as scientific name by ncbi. """ assert os.path.exists(names_file), ( "file `%s` does not exist. Make sure you downloaded the " "databases from ncbi." % names_file ) df = pd.read_csv( names_file, sep="|", header=None, index_col=False, names=["tax_id", "name_txt", "unique_name", "name_class"], ) df["name_txt"] = df["name_txt"].apply(strip) df["unique_name"] = df["unique_name"].apply(strip) df["name_class"] = df["name_class"].apply(strip) sci_df = df[df["name_class"] == "scientific name"] sci_df.reset_index(drop=True, inplace=True) return sci_df
[docs]def load_synonyms(names_file): """Loads names.dmp and converts it into a pandas.DataFrame. Includes only names which are viewed as synonym by ncbi. """ assert os.path.exists(names_file), ( "file `%s` does not exist. Make sure you downloaded " "the databases from ncbi." % names_file ) # print("load synonyms") df = pd.read_csv( names_file, sep="|", header=None, index_col=False, names=["tax_id", "name_txt", "unique_name", "name_class"], ) df["name_txt"] = df["name_txt"].apply(strip) df["unique_name"] = df["unique_name"].apply(strip) df["name_class"] = df["name_class"].apply(strip) sci_df = df[df["name_class"] == "synonym"] sci_df.reset_index(drop=True, inplace=True) return sci_df
[docs]class Parser: """Reads in databases from ncbi to connect species names with the taxonomic identifier and the corresponding hierarchical information. It provides a much faster way to get those information then using web queries. We use those files to get independent from web requests to find those information (the implementation of it in BioPython was not really reliable). Nodes includes the hierarchical information, names the scientific names and ID's. The files need to be updated regularly, best way to always do it when a new blast database was loaded. """ def __init__(self, names_file, nodes_file): self.names_file = names_file self.nodes_file = nodes_file # self.initialize() self.nodes = load_nodes(self.nodes_file) self.names = load_names(self.names_file) self.synonyms = load_synonyms(self.names_file)
[docs] def get_rank(self, tax_id): """ Get rank for given ncbi tax id. """ if tax_id is None: rank = "unassigned" else: nodes = self.nodes rank = nodes[nodes["tax_id"] == tax_id]["rank"].values[0] return rank
[docs] def get_downtorank_id(self, tax_id, downtorank="species"): """ Recursive function to find the parent id of a taxon as defined by downtorank. """ nodes = self.nodes if not isinstance(tax_id, int): # debug("get downtorank") # sys.stdout.write( # "WARNING: tax_id {} is no integer. Will convert value to int\n".format( # tax_id # ) # ) tax_id = int(tax_id) # debug(downtorank) # following statement is to get id of taxa if taxa is higher ranked than specified if nodes[nodes["tax_id"] == tax_id]["rank"].values[0] != "species": if downtorank == "species": if (nodes[nodes["tax_id"] == tax_id]["rank"].values[0] != "varietas" and nodes[nodes["tax_id"] == tax_id]["rank"].values[0] != "subspecies"): return tax_id if nodes[nodes["tax_id"] == tax_id]["rank"].values[0] == downtorank: # debug("found right rank") return tax_id elif nodes[nodes["tax_id"] == tax_id]["rank"].values[0] == "superkingdom": tax_id = 0 return tax_id else: parent_id = int(nodes[nodes["tax_id"] == tax_id]["parent_tax_id"].values[0]) return self.get_downtorank_id(parent_id, downtorank)
[docs] def match_id_to_mrca(self, tax_id, mrca_id): """ Recursive function to find out if tax_id is part of mrca_id. """ # debug("match_id_to_mrca") nodes = self.nodes # debug("testing if {} within {}".format(tax_id, mrca_id)) current_id = int(tax_id) mrca_id = int(mrca_id) #debug([rank_mrca_id, rank_tax_id]) while current_id: if current_id == mrca_id: # debug("found right rank") return True elif current_id == 1: # debug("current id is: {}".format(current_id)) return False elif current_id == 0: debug("current id is: {}, in search for {} in {}".format(current_id, tax_id, mrca_id)) return False else: #try parent try: current_id = int(nodes[nodes["tax_id"] == current_id]["parent_tax_id"].values[0]) # to get the except: # current_id = 17043521 # nodes = ncbitax.nodes ## ncbitax object from test_ncbi_parser.py except IndexError: sys.stderr.write("no parent found for ncbi:id {}".format(current_id)) return False
# debug("parent id is: {}".format(current_id))
[docs] def get_name_from_id(self, tax_id): """ Find the scientific name for a given ID. """ try: names = self.names if tax_id == 0: tax_name = "unidentified" else: tax_name = names[names["tax_id"] == tax_id]["name_txt"] tax_name = tax_name.values[0].replace(" ", "_") tax_name = tax_name.strip() except IndexError: sys.stdout.write("tax_id {} unknown by ncbi_parser files (names.dmp)\n".format(tax_id)) tax_name = "unknown_{}".format(tax_id) if os.path.exists("ncbi_id_unknown.err"): fn = open("ncbi_id_unknown.err", "a") fn.write("{}".format(tax_id)) fn.close() else: fn = open("ncbi_id_unknown.err", "w") fn.write("{}".format(tax_id)) fn.close() return tax_name
[docs] def get_id_from_name(self, tax_name): """ Find the ID for a given taxonomic name. """ names = self.names org_tax = tax_name tax_name = tax_name.replace("_", " ") if len(tax_name.split(" ")) >= 2: if tax_name.split(" ")[1] == "sp.": tax_name = "{}".format(tax_name.split(" ")[0]) try: tax_id = names[names["name_txt"] == tax_name]["tax_id"].values[0] except IndexError: if len(tax_name.split(" ")) == 3: tax_name = "{} {}-{}".format( tax_name.split(" ")[0], tax_name.split(" ")[1], tax_name.split(" ")[2], ) tax_id = names[names["name_txt"] == tax_name]["tax_id"].values[0] sys.stdout.write( "tax_name {} unknown, modified to {} worked.\n".format(org_tax, tax_name) ) else: sys.stdout.write( "Are you sure, its an accepted name and not a synonym: {}? " "I look in the synonym table now.\n".format(tax_name) ) tax_id = self.get_id_from_synonym(tax_name) tax_id = int(tax_id) return tax_id
[docs] def get_id_from_synonym(self, tax_name): """ Find the ID for a given taxonomic name, which is not an accepted name. """ names = self.names synonyms = self.synonyms tax_name = tax_name.replace("_", " ") try: tax_id = synonyms[synonyms["name_txt"] == tax_name]["tax_id"].values[0] except IndexError: if len(tax_name.split(" ")) == 3: tax_name = "{} {}-{}".format( tax_name.split(" ")[0], tax_name.split(" ")[1], tax_name.split(" ")[2], ) tax_id = names[names["name_txt"] == tax_name]["tax_id"].values[0] else: sys.stderr.write("ncbi taxon name unknown by parser files: {}, taxid set to 0.\n".format(tax_name)) tax_id = 0 if os.path.exists("ncbi_name_unknown.err"): fn = open("ncbi_name_unknown.err", "a") fn.write("{}".format(tax_id)) fn.close() else: fn = open("ncbi_name_unknown.err", "w") fn.write("{}".format(tax_id)) fn.close() return tax_id