mirror of
https://github.com/biopython/biopython.git
synced 2025-10-20 13:43:47 +08:00
Modified quality mapping in QualityIO to use faster string-translate method, removed "min" bottleneck in FastqIteratorAbstractBaseClass by replacing with faster string methods, and added more detailed error messaging with a custom InvalidCharError error class
This commit is contained in:
@ -362,12 +362,15 @@ are approximately equal.
|
||||
import warnings
|
||||
from math import log
|
||||
from abc import abstractmethod
|
||||
from typing import Callable
|
||||
from typing import Callable, Any
|
||||
from typing import IO
|
||||
from collections.abc import Iterator
|
||||
from collections.abc import Mapping
|
||||
from typing import Optional
|
||||
from collections.abc import Sequence
|
||||
from typing import Union
|
||||
from typing import Union, Iterable
|
||||
import array
|
||||
from dataclasses import dataclass
|
||||
|
||||
from Bio import BiopythonParserWarning
|
||||
from Bio import BiopythonWarning
|
||||
@ -386,6 +389,8 @@ from .Interfaces import SequenceWriter
|
||||
# Solexa offsets.
|
||||
SANGER_SCORE_OFFSET = 33
|
||||
SOLEXA_SCORE_OFFSET = 64
|
||||
INVALID_CHAR_CODE = 200
|
||||
INVALID_CHAR = bytes((INVALID_CHAR_CODE,))
|
||||
|
||||
|
||||
def solexa_quality_from_phred(phred_quality: float) -> float:
|
||||
@ -530,7 +535,7 @@ def phred_quality_from_solexa(solexa_quality: float) -> float:
|
||||
return 10 * log(10 ** (solexa_quality / 10.0) + 1, 10)
|
||||
|
||||
|
||||
def _get_phred_quality(record: SeqRecord) -> Union[list[int], list[float]]:
|
||||
def _get_phred_quality(record: SeqRecord) -> Union[list[float], list[int]]:
|
||||
"""Extract PHRED qualities from a SeqRecord's letter_annotations (PRIVATE).
|
||||
|
||||
If there are no PHRED qualities, but there are Solexa qualities, those are
|
||||
@ -1053,10 +1058,9 @@ class FastqIteratorAbstractBaseClass(SequenceIterator[str]):
|
||||
second_title = line[1:].rstrip()
|
||||
if second_title and second_title != title_line:
|
||||
raise ValueError("Sequence and quality captions differ.")
|
||||
seq_string = seq_string.encode() # type: ignore
|
||||
# This is going to slow things down a little, but assuming
|
||||
# this isn't allowed we should try and catch it here:
|
||||
if seq_string and min(seq_string) < ord("!"): # type: ignore
|
||||
|
||||
# Note: str.isprintable is False for ASCII characters 0-32 and 127
|
||||
if not seq_string.isprintable() or " " in seq_string: # type: ignore
|
||||
# first printable character
|
||||
raise ValueError("Whitespace is not allowed in the sequence.")
|
||||
|
||||
@ -1082,23 +1086,40 @@ class FastqIteratorAbstractBaseClass(SequenceIterator[str]):
|
||||
raise ValueError("Unexpected end of file")
|
||||
self.line = None
|
||||
|
||||
if seq_len != len(quality_string):
|
||||
raise ValueError(
|
||||
"Lengths of sequence and quality values differs for %s (%i and %i)."
|
||||
% (title_line, seq_len, len(quality_string))
|
||||
)
|
||||
descr = title_line
|
||||
id = descr.split()[0]
|
||||
name = id
|
||||
q_mapping = self.q_mapping
|
||||
try:
|
||||
qualities = [q_mapping[letter2] for letter2 in quality_string]
|
||||
except KeyError:
|
||||
raise ValueError("Invalid character in quality string") from None
|
||||
|
||||
# # Avoid length/type checking
|
||||
if not quality_string.isascii():
|
||||
# Look for invalid non-ascii characters
|
||||
index = _find_index_where(quality_string, lambda c: not c.isascii())
|
||||
assert index >= 0, "Non-ascii char in qualities not found. Biopython bug?"
|
||||
|
||||
details = "is not an ASCII character"
|
||||
raise InvalidCharError(quality_string, index, details)
|
||||
|
||||
if len(quality_string) != seq_len:
|
||||
# This should happen after ascii check, because non-ascii characters will often trigger this
|
||||
raise ValueError(
|
||||
f"Lengths of sequence and quality values differs for {title_line} ({seq_len} and {len(quality_string)})."
|
||||
)
|
||||
|
||||
byte_scores = quality_string.encode().translate(self.q_mapping)
|
||||
|
||||
if INVALID_CHAR in byte_scores:
|
||||
# Look for invalid but still ascii characters
|
||||
invalid_index = byte_scores.find(INVALID_CHAR_CODE)
|
||||
|
||||
details = "not in correct range (are you sure you're using the right QualityIO parser?)"
|
||||
raise InvalidCharError(quality_string, invalid_index, details)
|
||||
|
||||
# Pass through (standard library) array to handle negative scores from old quality formats
|
||||
qualities = array.array("b", byte_scores).tolist()
|
||||
|
||||
# SeqRecord._from_validated avoids length/type checking
|
||||
# .encode isn't strictly necessary (Seq init can handle a string), but it is faster to pre-encode
|
||||
record = SeqRecord._from_validated(
|
||||
Seq(seq_string),
|
||||
Seq(seq_string.encode()),
|
||||
id=id,
|
||||
name=name,
|
||||
description=descr,
|
||||
@ -1116,10 +1137,14 @@ class FastqPhredIterator(FastqIteratorAbstractBaseClass):
|
||||
# qualities = [ord(letter)-SANGER_SCORE_OFFSET for letter in quality_string]
|
||||
#
|
||||
# Precomputing is faster, perhaps partly by avoiding the subtractions.
|
||||
q_mapping = {
|
||||
chr(letter): letter - SANGER_SCORE_OFFSET
|
||||
for letter in range(SANGER_SCORE_OFFSET, 94 + SANGER_SCORE_OFFSET)
|
||||
}
|
||||
q_mapping = bytes(
|
||||
(
|
||||
letter - SANGER_SCORE_OFFSET
|
||||
if SANGER_SCORE_OFFSET <= letter < 94 + SANGER_SCORE_OFFSET
|
||||
else INVALID_CHAR_CODE
|
||||
)
|
||||
for letter in range(256)
|
||||
)
|
||||
|
||||
q_key = "phred_quality"
|
||||
|
||||
@ -1213,10 +1238,15 @@ class FastqSolexaIterator(FastqIteratorAbstractBaseClass):
|
||||
These files differ in the quality mapping.
|
||||
"""
|
||||
|
||||
q_mapping = {
|
||||
chr(letter): letter - SOLEXA_SCORE_OFFSET
|
||||
for letter in range(SOLEXA_SCORE_OFFSET - 5, 63 + SOLEXA_SCORE_OFFSET)
|
||||
}
|
||||
# For negative numbers, will need to interpret as a signed integer byte
|
||||
q_mapping = bytes(
|
||||
(
|
||||
(letter - SOLEXA_SCORE_OFFSET) % 256
|
||||
if SOLEXA_SCORE_OFFSET - 5 <= letter < 63 + SOLEXA_SCORE_OFFSET
|
||||
else INVALID_CHAR_CODE
|
||||
)
|
||||
for letter in range(256)
|
||||
)
|
||||
|
||||
q_key = "solexa_quality"
|
||||
|
||||
@ -1373,10 +1403,14 @@ class FastqIlluminaIterator(FastqIteratorAbstractBaseClass):
|
||||
These files differ in the quality mapping.
|
||||
"""
|
||||
|
||||
q_mapping = {
|
||||
chr(letter): letter - SOLEXA_SCORE_OFFSET
|
||||
for letter in range(SOLEXA_SCORE_OFFSET, 63 + SOLEXA_SCORE_OFFSET)
|
||||
}
|
||||
q_mapping = bytes(
|
||||
(
|
||||
letter - SOLEXA_SCORE_OFFSET
|
||||
if SOLEXA_SCORE_OFFSET <= letter < 63 + SOLEXA_SCORE_OFFSET
|
||||
else INVALID_CHAR_CODE
|
||||
)
|
||||
for letter in range(256)
|
||||
)
|
||||
|
||||
q_key = "phred_quality"
|
||||
|
||||
@ -1413,7 +1447,7 @@ class FastqIlluminaIterator(FastqIteratorAbstractBaseClass):
|
||||
>>> record2 = SeqIO.read("Quality/solexa_faked.fastq", "fastq-illumina")
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
ValueError: Invalid character in quality string
|
||||
Bio.SeqIO.QualityIO.InvalidCharError: Invalid character (?) or (0x3f) in quality string not in correct range (are you sure you're using the right QualityIO parser?) with context: [...BA@?>=<...]
|
||||
|
||||
NOTE - True Sanger style FASTQ files use PHRED scores with an offset
|
||||
of 33.
|
||||
@ -2105,7 +2139,13 @@ def _fastq_generic(
|
||||
# map the qual...
|
||||
qual = old_qual.translate(mapping)
|
||||
if null in qual:
|
||||
raise ValueError("Invalid character in quality string")
|
||||
invalid_index = qual.find(null)
|
||||
raise InvalidCharError(
|
||||
old_qual,
|
||||
invalid_index,
|
||||
details="not in correct range (are you sure you're using the right QualityIO parser?)",
|
||||
)
|
||||
|
||||
out_handle.write(f"@{title}\n{seq}\n+\n{qual}\n")
|
||||
return count
|
||||
|
||||
@ -2127,7 +2167,12 @@ def _fastq_generic2(
|
||||
# map the qual...
|
||||
qual = old_qual.translate(mapping)
|
||||
if null in qual:
|
||||
raise ValueError("Invalid character in quality string")
|
||||
invalid_index = qual.find(null)
|
||||
raise InvalidCharError(
|
||||
old_qual,
|
||||
invalid_index,
|
||||
details="not in correct range (are you sure you're using the right QualityIO parser?)",
|
||||
)
|
||||
if truncate_char in qual:
|
||||
qual = qual.replace(truncate_char, chr(126))
|
||||
warnings.warn(truncate_msg, BiopythonWarning)
|
||||
@ -2387,7 +2432,13 @@ def _fastq_convert_qual(
|
||||
try:
|
||||
qualities_strs = [mapping[ascii_] for ascii_ in qual]
|
||||
except KeyError:
|
||||
raise ValueError("Invalid character in quality string") from None
|
||||
invalid_index = _find_index_where(qual, lambda x: x not in mapping)
|
||||
assert invalid_index >= 0, "Invalid char not in mapping not found!"
|
||||
raise InvalidCharError(
|
||||
qual,
|
||||
invalid_index,
|
||||
details="not in correct range (are you sure you're using the right QualityIO parser?)",
|
||||
) from None
|
||||
data = " ".join(qualities_strs)
|
||||
while len(data) > 60:
|
||||
# Know quality scores are either 1 or 2 digits, so there
|
||||
@ -2429,6 +2480,45 @@ def _fastq_illumina_convert_qual(
|
||||
return _fastq_convert_qual(in_file, out_file, mapping)
|
||||
|
||||
|
||||
@dataclass
|
||||
class InvalidCharError(ValueError):
|
||||
"""
|
||||
Custom error for strings that have a character that is invalid for whatever reason (eg: non-ascii, invalid range)
|
||||
|
||||
Main attributes:
|
||||
- full_string - the string which contains the invalid character (str)
|
||||
- index - position of the invalid character in full_string (int)
|
||||
- details - additional information to add to the error message. Like: 'not in correct range' (str)
|
||||
- r - how many characters on each side of the invalid character to include in the error message (int)
|
||||
"""
|
||||
|
||||
full_string: str
|
||||
index: int
|
||||
details: str
|
||||
r: int = 3
|
||||
|
||||
def __str__(self) -> str:
|
||||
char = self.full_string[self.index]
|
||||
|
||||
surrounding_characters = self.full_string[
|
||||
max(self.index - self.r, 0) : self.index + self.r + 1
|
||||
]
|
||||
left_complete = self.index - self.r < 0
|
||||
prefix = "" if left_complete else "..."
|
||||
right_complete = self.index + self.r + 1 >= len(self.full_string)
|
||||
suffix = "" if right_complete else "..."
|
||||
|
||||
return f"Invalid character ({char}) or (0x{char.encode().hex()}) in quality string {self.details} with context: [{prefix}{surrounding_characters}{suffix}]"
|
||||
|
||||
|
||||
def _find_index_where(iterable: Iterable, predicate: Callable[[Any], bool]) -> int:
|
||||
for i, x in enumerate(iterable):
|
||||
if predicate(x) is True:
|
||||
return i
|
||||
|
||||
return -1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from Bio._utils import run_doctest
|
||||
|
||||
|
@ -224,6 +224,7 @@ please open an issue on GitHub or mention it on the mailing list.
|
||||
- Luca Monari <https://github.com/Lucandia>
|
||||
- Lucas Sinclair <https://github.com/xapple>
|
||||
- Lukasz Walejko <https://github.com/lwalejko>
|
||||
- Lyn H. <http://github.com/flaar94>
|
||||
- Manuel Lera Ramirez <https://github.com/manulera>
|
||||
- Manuel Nuno Melo <https://github.com/mnmelo>
|
||||
- Marc Colosimo <mcolosimo at domain mitre.org>
|
||||
|
Reference in New Issue
Block a user