- Implemented lazy _per_letter_annotation construction to speed up SeqRecord initialization (if you directly access the protected _per_letter_annotation, be aware that it can now be None. This corresponds to an empty dict)

- Added private alternative constructor SeqRecord._from_validated(...). This avoids type and length checks in cases when these have already been done
- Modified SeqRecord seq and letter_annotation attributes to use decorator-based formatting, and included more type annotations
- Modified sam parser to explicitly construct a new SeqRecord to remove the deepcopy bottleneck
- Modified FastaIterator, QualityIO, and SffIO parsers to use _from_validated to redundant checks and make code cleaner
- Added explicit checks in SeqRecord methods for cases when Seq is None, and specific error messages for those cases
This commit is contained in:
flaar94
2024-09-13 10:26:15 -07:00
committed by Peter Cock
parent 824c34399c
commit e6d24c595a
7 changed files with 214 additions and 104 deletions

View File

@ -698,15 +698,24 @@ class AlignmentIterator(interfaces.AlignmentIterator):
number = int(number)
target += seq[:number]
seq = target
index = self._target_indices[rname]
target = copy.deepcopy(self.targets[index])
length = len(target.seq)
rname_target = self.targets[self._target_indices[rname]]
length = len(rname_target.seq)
data = {}
index = 0
for start, size in zip(starts, sizes):
data[start] = seq[index : index + size]
index += size
target.seq = Seq(data, length=length)
target = SeqRecord._from_validated(
Seq(data, length=length),
rname_target.id,
rname_target.name,
rname_target.description,
annotations={
key: copy.copy(val)
for key, val in rname_target.annotations.items()
},
)
if coordinates is not None:
coordinates = np.array(coordinates).transpose()
if strand == "-":

View File

@ -123,7 +123,6 @@ assert _split(
"181647..181905",
]
_pair_location = r"[<>]?-?\d+\.\.[<>]?-?\d+"
_between_location = r"\d+\^\d+"

View File

@ -260,7 +260,7 @@ class FastaIterator(SequenceIterator):
assert not title, repr(title)
# Should we use SeqRecord default for no ID?
first_word = ""
return SeqRecord(
return SeqRecord._from_validated(
Seq(sequence), id=first_word, name=first_word, description=title
)

View File

@ -363,7 +363,6 @@ import warnings
from math import log
from abc import abstractmethod
from typing import Callable
from typing import IO
from collections.abc import Iterator
from collections.abc import Mapping
from typing import Optional
@ -531,14 +530,14 @@ def phred_quality_from_solexa(solexa_quality: float) -> float:
return 10 * log(10 ** (solexa_quality / 10.0) + 1, 10)
def _get_phred_quality(record: SeqRecord) -> Union[list[float], list[int]]:
def _get_phred_quality(record: SeqRecord) -> Union[list[int], list[float]]:
"""Extract PHRED qualities from a SeqRecord's letter_annotations (PRIVATE).
If there are no PHRED qualities, but there are Solexa qualities, those are
used instead after conversion.
"""
try:
return record.letter_annotations["phred_quality"]
return record.letter_annotations["phred_quality"] # type: ignore
except KeyError:
pass
try:
@ -1091,18 +1090,20 @@ class FastqIteratorAbstractBaseClass(SequenceIterator[str]):
descr = title_line
id = descr.split()[0]
name = id
record = SeqRecord(Seq(seq_string), id=id, name=name, description=descr)
q_mapping = self.q_mapping
try:
qualities = [q_mapping[letter2] for letter2 in quality_string]
except KeyError:
raise ValueError("Invalid character in quality string") from None
# For speed, will now use a dirty trick to speed up assigning the
# qualities. We do this to bypass the length check imposed by the
# per-letter-annotations restricted dict (as this has already been
# checked by FastqGeneralIterator). This is equivalent to:
# record.letter_annotations["phred_quality"] = qualities
dict.__setitem__(record._per_letter_annotations, self.q_key, qualities)
# # Avoid length/type checking
record = SeqRecord._from_validated(
Seq(seq_string),
id=id,
name=name,
description=descr,
letter_annotations={self.q_key: qualities},
)
return record
@ -1526,10 +1527,15 @@ class QualPhredIterator(SequenceIterator):
# Return the record and then continue...
sequence = Seq(None, length=len(qualities))
record = SeqRecord(sequence, id=id, name=name, description=descr)
# Dirty trick to speed up this line:
# record.letter_annotations["phred_quality"] = qualities
dict.__setitem__(record._per_letter_annotations, "phred_quality", qualities)
# Avoid unnecessary length/type checks
record = SeqRecord._from_validated(
sequence,
id=id,
name=name,
description=descr,
letter_annotations={"phred_quality": qualities},
)
return record

View File

@ -1006,12 +1006,15 @@ class SffIterator(SequenceIterator):
annotations["region"] = _get_read_region(name)
annotations["coords"] = _get_read_xy(name)
annotations["molecule_type"] = "DNA"
record = SeqRecord(
Seq(seq), id=name, name=name, description="", annotations=annotations
# Avoids type and length checks
record = SeqRecord._from_validated(
Seq(seq),
id=name,
name=name,
description="",
annotations=annotations,
letter_annotations={"phred_quality": quals},
)
# Dirty trick to speed up this line:
# record.letter_annotations["phred_quality"] = quals
dict.__setitem__(record._per_letter_annotations, "phred_quality", quals)
# Return the record and then continue...
return record

View File

@ -17,8 +17,7 @@ import warnings
from io import StringIO
from typing import Any
from typing import cast
from collections.abc import Iterable
from collections.abc import Mapping
from collections.abc import Iterator
from typing import NoReturn
from typing import Optional
from typing import overload
@ -179,6 +178,7 @@ class SeqRecord:
annotations: _AnnotationsDict
dbxrefs: list[str]
_per_letter_annotations: Optional[_RestrictedDict]
def __init__(
self,
@ -251,21 +251,8 @@ class SeqRecord:
raise TypeError("annotations argument must be a dict or None")
self.annotations = annotations
if letter_annotations is None:
# annotations about each letter in the sequence
if seq is None:
# Should we allow this and use a normal unrestricted dict?
self._per_letter_annotations: _RestrictedDict = _RestrictedDict(
length=0
)
else:
try:
self._per_letter_annotations = _RestrictedDict(length=len(seq))
except TypeError:
raise TypeError(
"seq argument should be a Seq object or similar"
) from None
else:
self._per_letter_annotations = None
if letter_annotations is not None:
# This will be handled via the property set function, which will
# turn this into a _RestrictedDict and thus ensure all the values
# in the dict are the right length
@ -280,24 +267,9 @@ class SeqRecord:
)
self.features = features
# TODO - Just make this a read only property?
def _set_per_letter_annotations(self, value: Mapping[str, str]) -> None:
if not isinstance(value, dict):
raise TypeError(
"The per-letter-annotations should be a (restricted) dictionary."
)
# Turn this into a restricted-dictionary (and check the entries)
try:
self._per_letter_annotations = _RestrictedDict(length=len(self.seq))
except AttributeError:
# e.g. seq is None
self._per_letter_annotations = _RestrictedDict(length=0)
self._per_letter_annotations.update(value)
letter_annotations = property(
fget=lambda self: self._per_letter_annotations,
fset=_set_per_letter_annotations,
doc="""Dictionary of per-letter-annotation for the sequence.
@property
def letter_annotations(self) -> dict[str, Sequence[Any]]:
"""Dictionary of per-letter-annotation for the sequence.
For example, this can hold quality scores used in FASTQ or QUAL files.
Consider this example using Bio.SeqIO to read in an example Solexa
@ -345,10 +317,38 @@ class SeqRecord:
Note that if replacing the record's sequence with a sequence of a
different length you must first clear the letter_annotations dict.
""",
)
"""
if self._per_letter_annotations is None:
length = 0 if self.seq is None else len(self.seq)
self._per_letter_annotations = _RestrictedDict(length=length)
return self._per_letter_annotations
def _set_seq(self, value: Union["Seq", "MutableSeq"]) -> None:
# TODO - Just make this a read only property?
@letter_annotations.setter
def letter_annotations(self, value: dict[str, Sequence[Any]]) -> None:
if not isinstance(value, dict):
raise TypeError(
"The per-letter-annotations should be a (restricted) dictionary."
)
# Turn this into a restricted-dictionary (and check the entries)
length = 0 if self.seq is None else len(self.seq)
if any(len(val) != length for val in value.values()):
raise ValueError(
f"The per-letter-annotations have the same length as the sequence, but found: \n {','.join([f'{key}={val}' for key, val in value.items() if len(val) != length])}"
)
if self._per_letter_annotations is None:
self._per_letter_annotations = _RestrictedDict(length=length)
else:
self._per_letter_annotations.clear()
dict.update(self._per_letter_annotations, value) # type: ignore
@property
def seq(self) -> Optional[Union["Seq", "MutableSeq"]]:
"""The sequence itself, as a Seq or MutableSeq object."""
return self._seq
@seq.setter
def seq(self, value: Union["Seq", "MutableSeq"]) -> None:
# Adding this here for users who are not type-checking their code.
if not isinstance(value, (Seq, MutableSeq)):
warnings.warn(
@ -369,17 +369,58 @@ class SeqRecord:
else:
self._seq = value
# Reset the (empty) letter annotations dict with new length:
try:
self._per_letter_annotations = _RestrictedDict(length=len(self.seq))
except AttributeError:
# e.g. seq is None
self._per_letter_annotations = _RestrictedDict(length=0)
length = 0 if self.seq is None else len(self.seq)
self._per_letter_annotations = _RestrictedDict(length=length)
seq = property(
fget=lambda self: self._seq,
fset=_set_seq,
doc="The sequence itself, as a Seq or MutableSeq object.",
)
@classmethod
def _from_validated(
cls,
seq: Optional[Union[Seq, MutableSeq]],
id: Optional[str] = "<unknown id>",
name: str = "<unknown name>",
description: str = "<unknown description>",
dbxrefs: Optional[list[str]] = None,
features: Optional[list["SeqFeature"]] = None,
annotations: Optional[dict[str, Union[str, int]]] = None,
letter_annotations: Optional[dict[str, Sequence]] = None,
) -> "SeqRecord":
"""Faster constructor for post-validated data like copies or validated parsed data"""
if cls is not SeqRecord:
# If we subclassed, we'll default to that class's initializer just to be careful
return cls(
seq,
id,
name,
description,
dbxrefs,
features,
annotations,
letter_annotations,
)
inst = cls.__new__(cls)
inst._seq = seq
inst.id = id
inst.name = name
inst.description = description
if dbxrefs is None:
dbxrefs = []
inst.dbxrefs = dbxrefs
if features is None:
features = []
inst.features = features
if annotations is None:
annotations = {}
inst.annotations = annotations
inst._per_letter_annotations = None
if letter_annotations is not None:
length = 0 if seq is None else len(seq)
inst._per_letter_annotations = _RestrictedDict(length=length)
dict.update(inst._per_letter_annotations, letter_annotations) # type: ignore
return inst
@overload
def __getitem__(self, index: int) -> str: ...
@ -500,10 +541,14 @@ class SeqRecord:
# NOTE - The sequence level annotation like the id, name, etc
# do not really apply to a single character. However, should
# we try and expose any per-letter-annotation here? If so how?
if self.seq is None:
raise ValueError(
"Seq in SeqRecord is None, it doesn't support indexing"
)
return self.seq[index]
elif isinstance(index, slice):
if self.seq is None:
raise ValueError("If the sequence is None, we cannot slice it.")
raise ValueError("Seq in SeqRecord is None, we cannot slice it")
parent_length = len(self)
try:
from BioSQL.BioSeq import DBSeqRecord
@ -513,14 +558,14 @@ class SeqRecord:
biosql_available = False
if biosql_available and isinstance(self, DBSeqRecord):
answer = SeqRecord(
answer = SeqRecord._from_validated(
self.seq[index],
id=self.id,
name=self.name,
description=self.description,
)
else:
answer = self.__class__(
answer = self._from_validated(
self.seq[index],
id=self.id,
name=self.name,
@ -566,12 +611,12 @@ class SeqRecord:
# Slice all the values to match the sliced sequence
# (this should also work with strides, even negative strides):
for key, value in self.letter_annotations.items():
answer._per_letter_annotations[key] = value[index]
answer.letter_annotations[key] = value[index]
return answer
raise ValueError("Invalid index")
def __iter__(self) -> Iterable[Union["Seq", "MutableSeq"]]:
def __iter__(self) -> Iterator[str]:
"""Iterate over the letters in the sequence.
For example, using Bio.SeqIO to read in a protein FASTA file:
@ -623,7 +668,9 @@ class SeqRecord:
You may agree that using zip(rec.seq, ...) is more explicit than using
zip(rec, ...) as shown above.
"""
return iter(self.seq)
if self._seq is None:
raise ValueError("Seq in SeqRecord is None, can't iterate over it")
return iter(self._seq)
def __contains__(self, char: str) -> bool:
"""Implement the 'in' keyword, searches the sequence.
@ -652,10 +699,14 @@ class SeqRecord:
See also the Seq object's __contains__ method.
"""
return char in self.seq
if self._seq is None:
raise ValueError("Seq in SeqRecord is None, can't convert to bytes")
return char in self._seq
def __bytes__(self) -> bytes:
return bytes(self.seq)
if self._seq is None:
raise ValueError("Seq in SeqRecord is None, can't convert to bytes")
return bytes(self._seq)
def __str__(self) -> str:
"""Return a human readable summary of the record and its annotation (string).
@ -703,14 +754,18 @@ class SeqRecord:
lines.append(
"Per letter annotation for: " + ", ".join(self.letter_annotations)
)
try:
bytes(self.seq)
except UndefinedSequenceError:
lines.append(f"Undefined sequence of length {len(self.seq)}")
if self.seq is not None:
try:
bytes(self.seq)
except UndefinedSequenceError:
lines.append(f"Undefined sequence of length {len(self.seq)}")
else:
# Don't want to include the entire sequence
seq = repr(self.seq)
lines.append(seq)
else:
# Don't want to include the entire sequence
seq = repr(self.seq)
lines.append(seq)
lines.append("Missing Sequence (None)")
return "\n".join(lines)
def __repr__(self) -> str:
@ -834,7 +889,7 @@ class SeqRecord:
>>> len(record.seq)
309
"""
return len(self.seq)
return len(self._seq) if self._seq is not None else 0
def __lt__(self, other: Any) -> NoReturn:
"""Define the less-than operand (not implemented)."""
@ -947,11 +1002,15 @@ class SeqRecord:
>>> new.annotations = plasmid.annotations.copy()
>>> new.dbxrefs = plasmid.dbxrefs[:]
"""
if self._seq is None:
raise ValueError("Left operand seq=None, can't add")
if not isinstance(other, SeqRecord):
# Assume it is a string or a Seq.
# Note can't transfer any per-letter-annotations
return type(self)(
self.seq + other,
self._seq + other,
id=self.id,
name=self.name,
description=self.description,
@ -959,9 +1018,13 @@ class SeqRecord:
annotations=self.annotations.copy(),
dbxrefs=self.dbxrefs[:],
)
# Adding two SeqRecord objects... must merge annotation.
answer = type(self)(
self.seq + other.seq, features=self.features[:], dbxrefs=self.dbxrefs[:]
if other._seq is None:
raise ValueError("Right SeqRecord has seq=None, can't add")
# Adding two SeqRecord objects... must merge annotation
answer = self._from_validated(
self._seq + other._seq, features=self.features[:], dbxrefs=self.dbxrefs[:]
)
# Will take all the features and all the db cross refs,
length = len(self)
@ -971,6 +1034,7 @@ class SeqRecord:
for ref in other.dbxrefs:
if ref not in answer.dbxrefs:
answer.dbxrefs.append(ref)
# Take common id/name/description/annotation
if self.id == other.id:
answer.id = self.id
@ -982,9 +1046,16 @@ class SeqRecord:
if k in other.annotations and other.annotations[k] == v:
answer.annotations[k] = v
# Can append matching per-letter-annotation
for k, v in self.letter_annotations.items():
if k in other.letter_annotations:
answer.letter_annotations[k] = v + other.letter_annotations[k]
try:
# To make this type safe, we would need to make sure the types are compatible, eg: no adding tuples and str
for k, v in self.letter_annotations.items(): # type: ignore
if k in other.letter_annotations:
# avoid length checks, but otherwise equivalent to answer.letter_annotations[k] = v + other.letter_annotations[k]
dict.__setitem__(answer.letter_annotations, k, v + other.letter_annotations[k]) # type: ignore
except TypeError:
print("Failed while try to concatenate letter annotations")
raise
return answer
def __radd__(self, other: Union["Seq", "MutableSeq", str]) -> "SeqRecord":
@ -1012,11 +1083,13 @@ class SeqRecord:
"This should have happened via the __add__ of "
"the other SeqRecord being added!"
)
if self.seq is None:
raise TypeError("Can't add (right hand side) SeqRecord with seq = None")
# Assume it is a string or a Seq.
# Note can't transfer any per-letter-annotations
offset = len(other)
return type(self)(
other + self.seq,
cast(Union[Seq, MutableSeq], other + self.seq),
id=self.id,
name=self.name,
description=self.description,
@ -1031,6 +1104,10 @@ class SeqRecord:
Optional arguments start and end are interpreted as in slice notation.
This method behaves as the count method of Python strings.
"""
if self._seq is None:
raise ValueError(
"seq is SeqRecord is None, assign it a sequence before applying count"
)
return self.seq.count(sub, start, end)
def upper(self) -> "SeqRecord":
@ -1059,7 +1136,11 @@ class SeqRecord:
"#$%&'()
<BLANKLINE>
"""
return type(self)(
if self.seq is None:
raise ValueError(
"seq is SeqRecord is None, assign it a sequence before applying upper"
)
return self._from_validated(
self.seq.upper(),
id=self.id,
name=self.name,
@ -1102,7 +1183,11 @@ class SeqRecord:
>>> old.dbxrefs == new.dbxrefs
True
"""
return type(self)(
if self.seq is None:
raise ValueError(
"seq is SeqRecord is None, assign it a sequence before applying lower"
)
return self._from_validated(
self.seq.lower(),
id=self.id,
name=self.name,
@ -1278,8 +1363,11 @@ class SeqRecord:
>>> print("%s %s" % (rc.id, rc.seq))
Test ACGA
"""
from Bio.Seq import MutableSeq # Lazy to avoid circular imports
from Bio.Seq import Seq # Lazy to avoid circular imports
if self.seq is None:
raise ValueError(
"Seq in SeqRecord is None, so can't construct the reverse_complement. Please assign it a sequence first"
)
if "protein" in cast(str, self.annotations.get("molecule_type", "")):
raise ValueError("Proteins do not have complements!")
@ -1290,7 +1378,7 @@ class SeqRecord:
seq = self.seq.reverse_complement()
if isinstance(self.seq, MutableSeq):
seq = Seq(seq)
answer = type(self)(seq)
answer = self._from_validated(seq)
if isinstance(id, str):
answer.id = id
elif id:
@ -1314,6 +1402,7 @@ class SeqRecord:
# Copy the old features, adjusting location and string
length = len(answer)
answer.features = [f._flip(length) for f in self.features]
# The old list should have been sorted by start location,
# reversing it will leave it sorted by what is now the end position,
# so we need to resort in case of overlapping features.
@ -1338,7 +1427,7 @@ class SeqRecord:
elif letter_annotations:
# Copy the old per letter annotations, reversing them
for key, value in self.letter_annotations.items():
answer._per_letter_annotations[key] = value[::-1]
answer.letter_annotations[key] = value[::-1]
return answer
def translate(
@ -1405,6 +1494,10 @@ class SeqRecord:
"""
if "protein" == self.annotations.get("molecule_type", ""):
raise ValueError("Proteins cannot be translated!")
if self.seq is None:
raise ValueError("Seq in SeqRecord is None, can't be translated")
answer = SeqRecord(
self.seq.translate(
table=table, stop_symbol=stop_symbol, to_stop=to_stop, cds=cds, gap=gap

View File

@ -539,7 +539,7 @@ class DBSeqRecord(SeqRecord):
def __del_seq(self):
del self._seq
seq = property(__get_seq, __set_seq, __del_seq, "Seq object")
seq = property(__get_seq, __set_seq, __del_seq, "Seq object") # type: ignore
@property
def dbxrefs(self) -> list[str]: