Changed several parsers avoid using next.

tempfile.NamedTemporaryFiles do not support __next__.
This commit is contained in:
Seth Sims
2025-02-04 10:45:24 -05:00
committed by Peter Cock
parent bbf3a40ed0
commit d6fcdfa131
6 changed files with 174 additions and 144 deletions

View File

@ -140,10 +140,8 @@ def MafIterator(handle, seq_count=None):
while True:
# allows parsing of the last bundle without duplicating code
try:
line = next(handle)
except StopIteration:
line = ""
line = handle.readline()
try:
# Will be in binary mode if called via the indexing code
# (which needs the raw offsets for cross platform indexing)

View File

@ -194,9 +194,8 @@ class FastaIterator(SequenceIterator):
if alphabet is not None:
raise ValueError("The alphabet argument is no longer supported")
super().__init__(source, fmt="Fasta")
try:
line = next(self.stream)
except StopIteration:
line = self.stream.readline()
if not line:
line = None
else:
if not line.startswith(">"):

View File

@ -144,16 +144,22 @@ class PirIterator(SequenceIterator):
line = self._line
if line is None:
raise StopIteration
stream = self.stream
pir_type = line[1:3]
if pir_type not in _pir_mol_type or line[3] != ";":
raise ValueError(
"Records should start with '>XX;' where XX is a valid sequence type"
)
identifier = line[4:].strip()
description = next(stream).strip()
description = self.stream.readline()
if description == "":
raise StopIteration
else:
description = description.strip()
lines = []
for line in stream:
for line in self.stream:
if line[0] == ">":
self._line = line
break

View File

@ -933,9 +933,9 @@ def FastqGeneralIterator(source: _TextIOSource) -> Iterator[tuple[str, str, str]
with as_handle(source) as handle:
if handle.read(0) != "":
raise StreamModeError("Fastq files must be opened in text mode") from None
try:
line = next(handle)
except StopIteration:
line = handle.readline()
if line == "":
return # Premature end of file, or just empty?
while True:
@ -1031,15 +1031,11 @@ class FastqIteratorAbstractBaseClass(SequenceIterator[str]):
def __next__(self) -> SeqRecord:
"""Parse the file and generate SeqRecord objects."""
line = self.line
if line is None:
try:
line = next(self.stream)
except StopIteration: # empty file?
self.line = None
else:
self.line = line
if line is None:
line = self.stream.readline()
if not line:
raise StopIteration
if line[0] != "@":
raise ValueError("Records in Fastq files should start with '@' character")

View File

@ -301,7 +301,7 @@ please open an issue on GitHub or mention it on the mailing list.
- Sebastian Bassi <https://about.me/bassi>
- Sergei Lebedev <https://github.com/superbobry>
- Sergio Valqui <https://github.com/svalqui>
- Seth Sims <seth.sims at gmail>
- Seth Sims <https://github.com/xzy3>
- She Zhang <https://github.com/shz66>
- Shoichiro Kawauchi <https://github.com/lacrosse91>
- Shuichiro MAKIGAKI <https://github.com/shuichiro-makigaki>

View File

@ -10,6 +10,8 @@ import unittest
import warnings
from io import BytesIO
from io import StringIO
from tempfile import NamedTemporaryFile
from contextlib import ExitStack
from Bio import AlignIO
from Bio import BiopythonParserWarning
@ -301,6 +303,7 @@ class TestSeqIO(SeqIOTestBaseClass):
else:
debug = False
unequal_length = len({len(_) for _ in records}) != 1
for fmt in test_write_read_alignment_formats:
if fmt not in possible_unknown_seq_formats and len(records[0].seq) > 100:
try:
@ -318,135 +321,163 @@ class TestSeqIO(SeqIOTestBaseClass):
if molecule_type is not None:
for record in records1:
record.annotations["molecule_type"] = molecule_type
# Going to write to a handle...
mode = self.get_mode(fmt)
if mode == "t":
handle = StringIO()
elif mode == "b":
handle = BytesIO()
if unequal_length and fmt in AlignIO._FormatToWriter:
msg = "Sequences must all be the same length"
elif fmt in messages:
msg = messages[fmt]
elif debug:
msg = True
else:
msg = None
for test_type in ("IO", "NamedTemporaryFile"):
with ExitStack() as exit_stack:
# Going to write to a handle...
mode = self.get_mode(fmt)
if test_type == "IO":
if mode == "t":
handle = exit_stack.enter_context(StringIO())
elif mode == "b":
handle = exit_stack.enter_context(BytesIO())
if msg:
# Should fail.
if debug:
try:
SeqIO.write(sequences=records1, handle=handle, format=fmt)
except (ValueError, TypeError) as e:
messages[fmt] = str(e)
else:
message = f"{t_format} -> {fmt}"
with self.assertRaises(Exception, msg=message) as cm:
with warnings.catch_warnings():
# e.g. data loss
warnings.simplefilter("ignore", BiopythonWarning)
SeqIO.write(sequences=records1, handle=handle, format=fmt)
self.assertTrue(
isinstance(cm.exception, (ValueError, TypeError)), msg=message
)
self.assertEqual(str(cm.exception), msg, msg=message)
elif test_type == "NamedTemporaryFile":
file_mode = "wb+"
encoding = None
if mode == "t":
encoding = "utf8"
file_mode = "w+"
# Carry on to the next format:
continue
# Should pass...
with warnings.catch_warnings():
# e.g. data loss
warnings.simplefilter("ignore", BiopythonWarning)
c = SeqIO.write(sequences=records1, handle=handle, format=fmt)
self.assertEqual(c, len(records1))
handle.flush()
handle.seek(0)
# Now ready to read back from the handle...
try:
records2 = list(SeqIO.parse(handle=handle, format=fmt))
except ValueError as e:
# This is BAD. We can't read our own output.
# I want to see the output when called from the test harness,
# run_tests.py (which can be funny about new lines on Windows)
handle.seek(0)
message = f"{e!s}\n\n{handle.read()!r}\n\n{records1!r}"
self.fail(message)
self.assertEqual(len(records2), t_count)
for r1, r2 in zip(records1, records2):
# Check the bare minimum (ID and sequence) as
# many formats can't store more than that.
self.assertEqual(len(r1), len(r2))
# Check the sequence
try:
bytes(r1.seq)
except UndefinedSequenceError:
self.assertRaises(UndefinedSequenceError, bytes, r2.seq)
else:
if fmt in ["gb", "genbank", "embl", "imgt"]:
# The GenBank/EMBL parsers will convert to upper case.
self.assertEqual(r1.seq.upper(), r2.seq)
elif fmt == "qual":
self.assertRaises(UndefinedSequenceError, bytes, r2.seq)
else:
self.assertEqual(r1.seq, r2.seq)
# Beware of different quirks and limitations in the
# valid character sets and the identifier lengths!
if fmt in ["phylip", "phylip-sequential"]:
self.assertEqual(
PhylipIO.sanitize_name(r1.id, 10),
r2.id,
f"'{r1.id}' vs '{r2.id}'",
)
elif fmt == "phylip-relaxed":
self.assertEqual(
PhylipIO.sanitize_name(r1.id),
r2.id,
f"'{r1.id}' vs '{r2.id}'",
)
elif fmt == "clustal":
self.assertEqual(
r1.id.replace(" ", "_")[:30],
r2.id,
f"'{r1.id}' vs '{r2.id}'",
)
elif fmt == "stockholm":
r1_id = r1.id.replace(" ", "_")
if "start" in r1.annotations and "end" in r1.annotations:
suffix = "/%d-%d" % (
r1.annotations["start"],
r1.annotations["end"],
handle = exit_stack.enter_context(
NamedTemporaryFile(mode=file_mode, encoding=encoding)
)
if not r1_id.endswith(suffix):
r1_id += suffix
self.assertEqual(r1_id, r2.id, f"'{r1.id}' vs '{r2.id}'")
elif fmt == "maf":
self.assertEqual(
r1.id.replace(" ", "_"), r2.id, f"'{r1.id}' vs '{r2.id}'"
)
elif fmt in ["fasta", "fasta-2line"]:
self.assertEqual(r1.id.split()[0], r2.id)
elif fmt == "nib":
self.assertEqual(r2.id, "<unknown id>")
else:
self.assertEqual(r1.id, r2.id, f"'{r1.id}' vs '{r2.id}'")
else:
self.fail(f"test type is not recognized: {test_type}")
if unequal_length and fmt in AlignIO._FormatToWriter:
msg = "Sequences must all be the same length"
elif fmt in messages:
msg = messages[fmt]
elif debug:
msg = True
else:
msg = None
if msg:
# Should fail.
if debug:
try:
SeqIO.write(
sequences=records1, handle=handle, format=fmt
)
except (ValueError, TypeError) as e:
messages[fmt] = str(e)
else:
message = f"{t_format} -> {fmt}"
with self.assertRaises(Exception, msg=message) as cm:
with warnings.catch_warnings():
# e.g. data loss
warnings.simplefilter("ignore", BiopythonWarning)
SeqIO.write(
sequences=records1, handle=handle, format=fmt
)
self.assertTrue(
isinstance(cm.exception, (ValueError, TypeError)),
msg=message,
)
self.assertEqual(str(cm.exception), msg, msg=message)
# Carry on to the next format:
continue
# Should pass...
with warnings.catch_warnings():
# e.g. data loss
warnings.simplefilter("ignore", BiopythonWarning)
c = SeqIO.write(sequences=records1, handle=handle, format=fmt)
self.assertEqual(c, len(records1))
handle.flush()
handle.seek(0)
# Now ready to read back from the handle...
try:
records2 = list(SeqIO.parse(handle=handle, format=fmt))
except ValueError as e:
# This is BAD. We can't read our own output.
# I want to see the output when called from the test harness,
# run_tests.py (which can be funny about new lines on Windows)
handle.seek(0)
message = f"{e!s}\n\n{handle.read()!r}\n\n{records1!r}"
self.fail(message)
self.assertEqual(len(records2), t_count)
for r1, r2 in zip(records1, records2):
# Check the bare minimum (ID and sequence) as
# many formats can't store more than that.
self.assertEqual(len(r1), len(r2))
# Check the sequence
try:
bytes(r1.seq)
except UndefinedSequenceError:
self.assertRaises(UndefinedSequenceError, bytes, r2.seq)
else:
if fmt in ["gb", "genbank", "embl", "imgt"]:
# The GenBank/EMBL parsers will convert to upper case.
self.assertEqual(r1.seq.upper(), r2.seq)
elif fmt == "qual":
self.assertRaises(UndefinedSequenceError, bytes, r2.seq)
else:
self.assertEqual(r1.seq, r2.seq)
# Beware of different quirks and limitations in the
# valid character sets and the identifier lengths!
if fmt in ["phylip", "phylip-sequential"]:
self.assertEqual(
PhylipIO.sanitize_name(r1.id, 10),
r2.id,
f"'{r1.id}' vs '{r2.id}'",
)
elif fmt == "phylip-relaxed":
self.assertEqual(
PhylipIO.sanitize_name(r1.id),
r2.id,
f"'{r1.id}' vs '{r2.id}'",
)
elif fmt == "clustal":
self.assertEqual(
r1.id.replace(" ", "_")[:30],
r2.id,
f"'{r1.id}' vs '{r2.id}'",
)
elif fmt == "stockholm":
r1_id = r1.id.replace(" ", "_")
if "start" in r1.annotations and "end" in r1.annotations:
suffix = "/%d-%d" % (
r1.annotations["start"],
r1.annotations["end"],
)
if not r1_id.endswith(suffix):
r1_id += suffix
self.assertEqual(r1_id, r2.id, f"'{r1.id}' vs '{r2.id}'")
elif fmt == "maf":
self.assertEqual(
r1.id.replace(" ", "_"),
r2.id,
f"'{r1.id}' vs '{r2.id}'",
)
elif fmt in ["fasta", "fasta-2line"]:
self.assertEqual(r1.id.split()[0], r2.id)
elif fmt == "nib":
self.assertEqual(r2.id, "<unknown id>")
else:
self.assertEqual(r1.id, r2.id, f"'{r1.id}' vs '{r2.id}'")
if len(records1) > 1:
# Try writing just one record (passing a SeqRecord, not a list)
if mode == "t":
handle = StringIO()
elif mode == "b":
handle = BytesIO()
with warnings.catch_warnings():
warnings.simplefilter("ignore", BiopythonWarning)
SeqIO.write(records1[0], handle, fmt)
if mode == "t":
self.assertEqual(
handle.getvalue(), records1[0].format(fmt)
)
if len(records1) > 1:
# Try writing just one record (passing a SeqRecord, not a list)
if mode == "t":
handle = StringIO()
elif mode == "b":
handle = BytesIO()
with warnings.catch_warnings():
warnings.simplefilter("ignore", BiopythonWarning)
SeqIO.write(records1[0], handle, fmt)
if mode == "t":
self.assertEqual(handle.getvalue(), records1[0].format(fmt))
if debug:
self.fail(
f"Update {t_format} test to use this dict:\nmessages = {messages!r}"