Bigbed search (#4046)

* implement chromosome searches

* include chromosome coordinates in search

* add test

* update
This commit is contained in:
mdehoon
2022-08-23 16:11:30 +09:00
committed by GitHub
parent 1ebd1bb919
commit 07026ecbd0
3 changed files with 191 additions and 3 deletions

View File

@ -25,8 +25,8 @@ You are expected to use this module via the Bio.Align functions.
import numpy
import zlib
import struct
import zlib
from collections import namedtuple
@ -123,7 +123,7 @@ class AlignmentIterator(interfaces.AlignmentIterator):
self.tree = self._read_index(stream, fullIndexOffset)
self._data = self._search_index(stream)
self._data = self._iterate_index(stream)
def _read_chromosomes(self, stream, pos):
byteorder = self.byteorder
@ -331,7 +331,7 @@ class AlignmentIterator(interfaces.AlignmentIterator):
pos = dataOffsets.pop(id(node))
stream.seek(pos)
def _search_index(self, stream):
def _iterate_index(self, stream):
byteorder = self.byteorder
if byteorder == "little":
byteorder_char = "<"
@ -377,8 +377,76 @@ class AlignmentIterator(interfaces.AlignmentIterator):
else:
node = children[0]
def _search_index(self, stream, chromIx, start, end):
byteorder = self.byteorder
if byteorder == "little":
byteorder_char = "<"
elif byteorder == "big":
byteorder_char = ">"
else:
raise ValueError("Unexpected byteorder '%s'" % byteorder)
padded_start = start - 1
padded_end = end + 1
node = self.tree
while True:
try:
children = node.children
except AttributeError:
stream.seek(node.dataOffset)
data = stream.read(node.dataSize)
if self._compressed > 0:
data = zlib.decompress(data)
while data:
# Supplemental Table 12: Binary BED-data format
# chromId 4 bytes
# chromStart 4 bytes
# chromEnd 4 bytes
# rest zero-terminated string in tab-separated format
child_chromIx, child_chromStart, child_chromEnd = struct.unpack(
byteorder_char + "III", data[:12]
)
rest, data = data[12:].split(b"\00", 1)
if child_chromIx != chromIx:
continue
if end <= child_chromStart or child_chromEnd <= start:
if child_chromStart != child_chromEnd:
continue
if child_chromStart != end and child_chromEnd != start:
continue
yield (child_chromIx, child_chromStart, child_chromEnd, rest)
else:
visit_child = False
for child in children:
if (child.endChromIx, child.endBase) < (chromIx, padded_start):
continue
if (chromIx, padded_end) < (child.startChromIx, child.startBase):
continue
visit_child = True
break
if visit_child:
node = child
continue
while True:
parent = node.parent
if parent is None:
return
for index, child in enumerate(parent.children):
if id(node) == id(child):
break
else:
raise RuntimeError("Failed to find child node")
try:
node = parent.children[index + 1]
except IndexError:
node = parent
else:
break
def _read_next_alignment(self, stream):
chunk = next(self._data)
return self._create_alignment(chunk)
def _create_alignment(self, chunk):
chromId, chromStart, chromEnd, rest = chunk
words = rest.decode().split("\t")
target_record = self.targets[chromId]
@ -467,3 +535,40 @@ class AlignmentIterator(interfaces.AlignmentIterator):
def __len__(self):
return self._length
def search(self, chromosome=None, start=None, end=None):
"""Iterate over alignments overlapping the specified chromosome region..
This method searches the index to find alignments to the specified
chromosome that fully or partially overlap the chromosome region
between start and end.
Arguments:
- chromosome - chromosome name. If None (default value), include all
alignments.
- start - starting position on the chromosome. If None (default
value), use 0 as the starting position.
- end - end position on the chromosome. If None (default value),
use the length of the chromosome as the end position.
"""
stream = self._stream
if chromosome is None:
if start is not None or end is not None:
raise ValueError(
"start and end must both be None if chromosome is None"
)
else:
for chromIx, target in enumerate(self.targets):
if target.id == chromosome:
break
else:
raise ValueError("Failed to find %s in alignments" % chromosome)
if start is None:
start = 0
if end is None:
end = len(target)
data = self._search_index(stream, chromIx, start, end)
for chunk in data:
alignment = self._create_alignment(chunk)
yield alignment

BIN
Tests/Blat/bigbedtest.bb Normal file

Binary file not shown.

View File

@ -1930,6 +1930,89 @@ class TestAlign_bed12(unittest.TestCase):
self.fail(f"More than two alignments reported in {filename}")
class TestAlign_searching(unittest.TestCase):
# The bigBed file bigbedbigbedtest.bb contains the following data:
# chr1 10 100 name1 1 +
# chr1 29 39 name2 2 -
# chr1 200 300 name3 3 +
# chr2 50 50 name4 6 +
# chr2 100 110 name5 4 +
# chr2 200 210 name6 5 +
# chr2 220 220 name7 6 +
# chr3 0 0 name8 7 -
def test_search_chromosome(self):
path = "Blat/bigbedtest.bb"
alignments = bigbed.AlignmentIterator(path)
selected_alignments = alignments.search("chr2", 0, 1000)
names = [alignment.query.id for alignment in selected_alignments]
self.assertEqual(names, ["name4", "name5", "name6", "name7"])
def test_search_region(self):
path = "Blat/bigbedtest.bb"
alignments = bigbed.AlignmentIterator(path)
selected_alignments = alignments.search("chr2", 105, 1000)
names = [alignment.query.id for alignment in selected_alignments]
self.assertEqual(names, ["name5", "name6", "name7"])
selected_alignments = alignments.search("chr2", 110, 1000)
names = [alignment.query.id for alignment in selected_alignments]
self.assertEqual(names, ["name6", "name7"])
selected_alignments = alignments.search("chr2", 40, 50)
names = [alignment.query.id for alignment in selected_alignments]
self.assertEqual(names, ["name4"])
selected_alignments = alignments.search("chr2", 50, 50)
names = [alignment.query.id for alignment in selected_alignments]
self.assertEqual(names, ["name4"])
selected_alignments = alignments.search("chr2", 50, 200)
names = [alignment.query.id for alignment in selected_alignments]
self.assertEqual(names, ["name4", "name5"])
selected_alignments = alignments.search("chr2", 200, 220)
names = [alignment.query.id for alignment in selected_alignments]
self.assertEqual(names, ["name6", "name7"])
selected_alignments = alignments.search("chr2", 220, 220)
names = [alignment.query.id for alignment in selected_alignments]
self.assertEqual(names, ["name7"])
def test_three_iterators(self):
"""Create three iterators and use them concurrently."""
path = "Blat/bigbedtest.bb"
alignments1 = bigbed.AlignmentIterator(path)
alignments2 = alignments1.search("chr2")
alignments3 = alignments1.search("chr2", 110, 1000)
alignment1 = next(alignments1)
self.assertEqual(alignment1.query.id, "name1")
alignment1 = next(alignments1)
self.assertEqual(alignment1.query.id, "name2")
alignment2 = next(alignments2)
self.assertEqual(alignment2.query.id, "name4")
alignment2 = next(alignments2)
self.assertEqual(alignment2.query.id, "name5")
alignment2 = next(alignments2)
self.assertEqual(alignment2.query.id, "name6")
alignment3 = next(alignments3)
self.assertEqual(alignment3.query.id, "name6")
alignment3 = next(alignments3)
self.assertEqual(alignment3.query.id, "name7")
alignment1 = next(alignments1)
self.assertEqual(alignment1.query.id, "name3")
alignment1 = next(alignments1)
self.assertEqual(alignment1.query.id, "name4")
alignment1 = next(alignments1)
self.assertEqual(alignment1.query.id, "name5")
alignment2 = next(alignments2)
self.assertEqual(alignment2.query.id, "name7")
self.assertRaises(StopIteration, next, alignments2)
alignment1 = next(alignments1)
self.assertEqual(alignment1.query.id, "name6")
alignment1 = next(alignments1)
self.assertEqual(alignment1.query.id, "name7")
self.assertRaises(StopIteration, next, alignments3)
alignment1 = next(alignments1)
self.assertEqual(alignment1.query.id, "name8")
self.assertRaises(StopIteration, next, alignments1)
if __name__ == "__main__":
runner = unittest.TextTestRunner(verbosity=2)
unittest.main(testRunner=runner)