Infer the tabular format from the header

This commit is contained in:
Samuel Prince
2025-01-19 09:35:33 -05:00
committed by Wibowo Arindrarto
parent c30a15fbae
commit 38f0fed34a
3 changed files with 252 additions and 14 deletions

View File

@ -13,6 +13,7 @@ from Bio.SearchIO._model import HSPFragment
from Bio.SearchIO._model import QueryResult
from ._base import _BaseInfernalParser
from Bio.File import _open_for_random_access
__all__ = ("InfernalTabParser", "InfernalTabIndexer")
@ -92,7 +93,80 @@ _TAB_FORMAT = {
"description",
),
}
_DEFAULT_TAB_FORMAT = 1
_TABULAR_FMT_1_HEADER_FIELDS = [
"target name",
"accession",
"query name",
"accession",
"mdl",
"mdl from",
"mdl to",
"seq from",
"seq to",
"strand",
"trunc",
"pass",
"gc",
"bias",
"score",
"E-value",
"inc",
"description of target",
]
_TABULAR_FMT_2_HEADER_FIELDS = [
"idx",
"target name",
"accession",
"query name",
"accession",
"clan name",
"mdl",
"mdl from",
"mdl to",
"seq from",
"seq to",
"strand",
"trunc",
"pass",
"gc",
"bias",
"score",
"E-value",
"inc",
"olp",
"anyidx",
"afrct1",
"afrct2",
"winidx",
"wfrct1",
"wfrct2",
"mdl len",
"seq len",
"description of target",
]
_TABULAR_FMT_3_HEADER_FIELDS = [
"target name",
"accession",
"query name",
"accession",
"mdl",
"mdl from",
"mdl to",
"seq from",
"seq to",
"strand",
"trunc",
"pass",
"gc",
"bias",
"score",
"E-value",
"inc",
"mdl len",
"seq len",
"description of target",
]
# column to class attribute map
_COLUMN_QRESULT = {
@ -134,25 +208,81 @@ _COLUMN_FRAG = {
}
def tabular_format_from_header(handle, is_byte=False):
"""Determine the tabular format based on the header (PRIVATE)."""
# infernal tabular output columns are space separated files with spaces in
# column names, ex:
# #target name accession
# #------------------- ---------
# so we use the second line of the header where the is not spaces to
# determine the position of the headers labels. Then we can get the label names
# and compare the the different tabular format names. This label comparison is done
# (instead of counting fields) to avoid issues if future tabular format have the
# same number of fields, but with different signification
def process_line(handle, is_byte):
"""Process a line from the handle, decoding byte stream."""
line = handle.readline().strip()
return line.decode() if is_byte else line
header_label = process_line(handle, is_byte)
header_delim = process_line(handle, is_byte)
if header_label == "" or header_delim == "":
raise ValueError(
"Unexpected empty line in the header of Infernal tabular output."
)
# get the indices of the spaces fields delimiter from the second header line
# the end of the file is added to the indices to process the last field
indices = [idx for idx, char in enumerate(header_delim) if char == " "] + [
len(header_delim)
]
prev_idx = 1
fields = []
for idx in indices:
fields.append(header_label[prev_idx:idx].strip())
prev_idx = idx
# compare the headers fields to the possible fields to determine the format
if fields == _TABULAR_FMT_1_HEADER_FIELDS:
return 1
elif fields == _TABULAR_FMT_2_HEADER_FIELDS:
return 2
elif fields == _TABULAR_FMT_3_HEADER_FIELDS:
return 3
else:
raise ValueError(
"Cannot determine the tabular format from the header. \
The tabular file is likely incorrect or its format is unsupported. \
Tabular format 1, 2 and 3 are currently supported)."
)
class InfernalTabParser(_BaseInfernalParser):
"""Parser for the Infernal tabular format."""
def __init__(self, handle, fmt=_DEFAULT_TAB_FORMAT):
def __init__(self, handle, fmt=None):
"""Initialize the class."""
self.handle = handle
self.line = self.handle.readline().strip()
if not isinstance(fmt, int):
raise TypeError
if not 1 <= fmt <= 3:
raise ValueError("Invalid tabular format number, must be 1, 2 or 3.")
self.fmt = fmt
# if a tabular fmt is specified, set its value
if isinstance(fmt, int):
if not 1 <= fmt <= 3:
raise ValueError(
"Unsupported tabular format. Format 1, 2 and 3 are currently supported."
)
self.fmt = fmt
# if the format is not provided by the user, guess the format
else:
self.fmt = tabular_format_from_header(handle)
def __iter__(self):
"""Iterate over InfernalTabParser, yields query results."""
# read through the header and footer
# read through the header (and footer if there is no results)
self.line = self.handle.readline()
while self.line.startswith("#"):
self.line = self.handle.readline()
# if we have result rows, parse it
# if we have result rows, parse them
if self.line:
yield from self._parse_qresult()
@ -286,10 +416,23 @@ class InfernalTabIndexer(SearchIndexer):
_parser = InfernalTabParser
def __init__(self, filename, fmt=_DEFAULT_TAB_FORMAT):
def __init__(self, filename, fmt=None):
"""Initialize the class."""
# the index of the query id column in the tabular file varies depending on the fmt
# we need to use the format set by the user, or infer the format to parse the records
if isinstance(fmt, int):
if not 1 <= fmt <= 3:
raise ValueError(
"Unsupported tabular format. Format 1, 2 and 3 are currently supported."
)
self._query_id_idx = 3 if fmt == 2 else 2
# if the format is not provided by the user, infer the format
else:
handle = _open_for_random_access(filename)
fmt = tabular_format_from_header(handle, is_byte=True)
self._query_id_idx = 3 if fmt == 2 else 2
SearchIndexer.__init__(self, filename, fmt=fmt)
self._query_id_idx = 3 if fmt == 2 else 2
def __iter__(self):
"""Iterate over the file handle; yields key, start offset, and length."""
@ -298,7 +441,7 @@ class InfernalTabIndexer(SearchIndexer):
qresult_key = None
start_offset = handle.tell()
# set line with initial mock value
# set line with mock value
line = b"#"
# read through header

View File

@ -210,6 +210,54 @@ class CmscanCases(unittest.TestCase):
self.assertRaises(StopIteration, next, qresults)
self.assertEqual(count, 1)
def test_cmscan_mq_mm_fmt2_infer(self):
"""Test parsing infernal-tab, cmscan, multiple queries, multiple hit, one hsp, fmt 2, inferred (IRES_5S_U2_Yeast_fmt_2)"""
tab_file = get_file("cmscan_115_IRES_5S_U2_Yeast_fmt_2.tbl")
qresults = parse(tab_file, FMT)
counter = itertools.count(start=1)
# first qresult
qresult, count = next_result(qresults, counter)
self.assertEqual(len(qresult), 1)
self.assertEqual(qresult.id, "ENA|BK006936|BK006936.2")
self.assertEqual(qresult.accession, "-")
self.assertEqual(qresult.clan, "-")
self.assertEqual(qresult.seq_len, 813184)
hit = qresult[0]
self.assertEqual(len(hit), 1)
self.assertEqual(hit.id, "U2")
self.assertEqual(hit.accession, "RF00004")
self.assertEqual(hit.description, "U2 spliceosomal RNA")
self.assertEqual(hit.seq_len, 193)
# first hsp
hsp = hit[0]
self.assertEqual(len(hsp), 1)
self.assertEqual(hsp.evalue, 1.2e-20)
self.assertEqual(hsp.bitscore, 98.7)
self.assertEqual(hsp.bias, 0.1)
self.assertEqual(hsp.gc, 0.33)
self.assertEqual(hsp.truncated, "no")
self.assertEqual(hsp.model, "cm")
self.assertEqual(hsp.pipeline_pass, 1)
self.assertTrue(hsp.is_included)
self.assertEqual("*", hsp.olp)
self.assertEqual("-", hsp.anyidx)
self.assertEqual("-", hsp.afrct1)
self.assertEqual("-", hsp.afrct2)
self.assertEqual("-", hsp.winidx)
self.assertEqual("-", hsp.wfrct1)
self.assertEqual("-", hsp.wfrct2)
frag = hsp[0]
self.assertEqual(frag.query_start, 1)
self.assertEqual(frag.query_end, 193)
self.assertEqual(frag.hit_start, 681747)
self.assertEqual(frag.hit_end, 681858)
self.assertEqual(frag.hit_strand, -1)
# test if we've properly finished iteration
self.assertRaises(StopIteration, next, qresults)
self.assertEqual(count, 1)
def test_cmscan_mq_mm_fmt3(self):
"""Test parsing infernal-tab, cmscan, multiple queries, multiple hit, one hsp, fmt 3 (IRES_5S_U2_Yeast_fmt_3)"""
tab_file = get_file("cmscan_115_IRES_5S_U2_Yeast_fmt_3.tbl")
@ -250,6 +298,46 @@ class CmscanCases(unittest.TestCase):
self.assertRaises(StopIteration, next, qresults)
self.assertEqual(count, 1)
def test_cmscan_mq_mm_fmt3_infer(self):
"""Test parsing infernal-tab, cmscan, multiple queries, multiple hit, one hsp, fmt 3, inferred (IRES_5S_U2_Yeast_fmt_3)"""
tab_file = get_file("cmscan_115_IRES_5S_U2_Yeast_fmt_3.tbl")
qresults = parse(tab_file, FMT)
counter = itertools.count(start=1)
# first qresult
qresult, count = next_result(qresults, counter)
self.assertEqual(len(qresult), 1)
self.assertEqual("ENA|BK006936|BK006936.2", qresult.id)
self.assertEqual(qresult.accession, "-")
self.assertEqual(813184, qresult.seq_len)
hit = qresult[0]
self.assertEqual(len(hit), 1)
self.assertEqual(hit.id, "U2")
self.assertEqual(hit.accession, "RF00004")
self.assertEqual(hit.description, "U2 spliceosomal RNA")
self.assertEqual(193, hit.seq_len)
# first hsp
hsp = hit[0]
self.assertEqual(len(hsp), 1)
self.assertEqual(hsp.evalue, 1.2e-20)
self.assertEqual(hsp.bitscore, 98.7)
self.assertEqual(hsp.bias, 0.1)
self.assertEqual(hsp.gc, 0.33)
self.assertEqual(hsp.truncated, "no")
self.assertEqual(hsp.model, "cm")
self.assertEqual(hsp.pipeline_pass, 1)
self.assertTrue(hsp.is_included)
frag = hsp[0]
self.assertEqual(frag.query_start, 1)
self.assertEqual(frag.query_end, 193)
self.assertEqual(frag.hit_start, 681747)
self.assertEqual(frag.hit_end, 681858)
self.assertEqual(frag.hit_strand, -1)
# test if we've properly finished iteration
self.assertRaises(StopIteration, next, qresults)
self.assertEqual(count, 1)
class CmsearchCases(unittest.TestCase):
"""Test parsing cmsearch output."""

View File

@ -55,6 +55,13 @@ U2 RF00004 ENA|BK006937|BK006937.2 - cm 1
"""
self.check_raw(filename, "ENA|BK006936|BK006936.2", raw, fmt=2)
def test_infernal_tab_multiple_fmt_2_infer(self):
"""Test infernal-tab raw string retrieval, cmsearch, multiple queries, fmt 2, inferred."""
filename = os.path.join("Infernal", "cmscan_115_IRES_5S_U2_Yeast_fmt_2.tbl")
raw = """1 U2 RF00004 ENA|BK006936|BK006936.2 - - cm 1 193 681858 681747 - no 1 0.33 0.1 98.7 1.2e-20 ! * - - - - - - 193 813184 U2 spliceosomal RNA
"""
self.check_raw(filename, "ENA|BK006936|BK006936.2", raw)
class InfernalTabIndexCases(CheckIndex):
fmt = "infernal-tab"
@ -65,7 +72,7 @@ class InfernalTabIndexCases(CheckIndex):
self.check_index(filename, self.fmt)
def test_infernal_tab_1q_0m(self):
"""Test infernal-tab indexing, cmsearch, single query, not hits."""
"""Test infernal-tab indexing, cmsearch, single query, no hits."""
filename = os.path.join("Infernal", "cmsearch_114_IRES_Yeast.tbl")
self.check_index(filename, self.fmt)