Files
biopython/Bio/Compass/__init__.py

261 lines
8.6 KiB
Python

# Copyright 2004 by James Casbon. All rights reserved.
# This code is part of the Biopython distribution and governed by its
# license. Please see the LICENSE file that should have been included
# as part of this package.
"""
Code to deal with COMPASS output, a program for profile/profile comparison.
Compass is described in:
Sadreyev R, Grishin N. COMPASS: a tool for comparison of multiple protein
alignments with assessment of statistical significance. J Mol Biol. 2003 Feb
7;326(1):317-36.
Tested with COMPASS 1.24.
Classes:
Record One result of a compass file
_Scanner Scan compass results
_Consumer Consume scanner events
RecordParser Parse one compass record
Iterator Iterate through a number of compass records
"""
from Bio import File
from Bio.ParserSupport import *
import re,string
class Record:
"""
Hold information from one compass hit.
Ali1 one is the query, Ali2 the hit.
"""
def __init__(self):
self.query=''
self.hit=''
self.gap_threshold=0
self.query_length=0
self.query_filtered_length=0
self.query_nseqs=0
self.query_neffseqs=0
self.hit_length=0
self.hit_filtered_length=0
self.hit_nseqs=0
self.hit_neffseqs=0
self.sw_score=0
self.evalue=-1
self.query_start=-1
self.hit_start=-1
self.query_aln=''
self.hit_aln=''
self.positives=''
def query_coverage(self):
"""Return the length of the query covered in alignment"""
s = string.replace(self.query_aln, "=", "")
return len(s)
def hit_coverage(self):
"""Return the length of the hit covered in the alignment"""
s = string.replace(self.hit_aln, "=", "")
return len(s)
class _Scanner:
"""Reads compass output and generate events"""
def feed(self, handle, consumer):
"""Feed in COMPASS ouput"""
if isinstance(handle, File.UndoHandle):
pass
else:
handle = File.UndoHandle(handle)
assert isinstance(handle, File.UndoHandle), \
"handle must be an UndoHandle"
if handle.peekline():
self._scan_record(handle, consumer)
def _scan_record(self,handle,consumer):
self._scan_names(handle, consumer)
self._scan_threshold(handle, consumer)
self._scan_lengths(handle,consumer)
self._scan_profilewidth(handle, consumer)
self._scan_scores(handle,consumer)
self._scan_alignment(handle,consumer)
def _scan_names(self,handle,consumer):
"""
Ali1: 60456.blo.gz.aln Ali2: allscop//14984.blo.gz.aln
"""
read_and_call(handle, consumer.names, contains="Ali1:")
def _scan_threshold(self,handle, consumer):
"""
Threshold of effective gap content in columns: 0.5
"""
read_and_call(handle, consumer.threshold, start="Threshold")
def _scan_lengths(self,handle, consumer):
"""
length1=388 filtered_length1=386 length2=145 filtered_length2=137
"""
read_and_call(handle, consumer.lengths, start="length1=")
def _scan_profilewidth(self,handle,consumer):
"""
Nseqs1=399 Neff1=12.972 Nseqs2=1 Neff2=6.099
"""
read_and_call(handle, consumer.profilewidth, contains="Nseqs1")
def _scan_scores(self,handle, consumer):
"""
Smith-Waterman score = 37 Evalue = 5.75e+02
"""
read_and_call(handle, consumer.scores, start="Smith-Waterman")
def _scan_alignment(self,handle, consumer):
"""
QUERY 2 LSDRLELVSASEIRKLFDIAAGMKDVISLGIGEPDFDTPQHIKEYAKEALDKGLTHYGPN
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
QUERY 2 LSDRLELVSASEIRKLFDIAAGMKDVISLGIGEPDFDTPQHIKEYAKEALDKGLTHYGPN
QUERY IGLLELREAIAEKLKKQNGIEADPKTEIMVLLGANQAFLMGLSAFLKDGEEVLIPTPAFV
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
QUERY IGLLELREAIAEKLKKQNGIEADPKTEIMVLLGANQAFLMGLSAFLKDGEEVLIPTPAFV
"""
while 1:
line = handle.readline()
if not line:
break
if is_blank_line(line):
continue
else:
consumer.query_alignment(line)
read_and_call(handle, consumer.positive_alignment)
read_and_call(handle, consumer.hit_alignment)
class _Consumer:
# all regular expressions used -- compile only once
_re_names = re.compile("Ali1:\s+(\S+)\s+Ali2:\s+(\S+)\s+")
_re_threshold = \
re.compile("Threshold of effective gap content in columns: (\S+)")
_re_lengths = \
re.compile("length1=(\S+)\s+filtered_length1=(\S+)\s+length2=(\S+)"
+ "\s+filtered_length2=(\S+)")
_re_profilewidth = \
re.compile("Nseqs1=(\S+)\s+Neff1=(\S+)\s+Nseqs2=(\S+)\s+Neff2=(\S+)")
_re_scores = re.compile("Smith-Waterman score = (\S+)\s+Evalue = (\S+)")
_re_start = re.compile("(\d+)")
_re_align = re.compile("^.{15}(\S+)")
_re_positive_alignment = re.compile("^.{15}(.+)")
def __init__(self):
self.data = None
def names(self, line):
"""
Ali1: 60456.blo.gz.aln Ali2: allscop//14984.blo.gz.aln
------query----- -------hit-------------
"""
self.data = Record()
m = self.__class__._re_names.search(line)
self.data.query = m.group(1)
self.data.hit = m.group(2)
def threshold(self,line):
m = self.__class__._re_threshold.search(line)
self.data.gap_threshold = float(m.group(1))
def lengths(self,line):
m = self.__class__._re_lengths.search(line)
self.data.query_length = int(m.group(1))
self.data.query_filtered_length = float(m.group(2))
self.data.hit_length = int(m.group(3))
self.data.hit_filtered_length = float(m.group(4))
def profilewidth(self,line):
m = self.__class__._re_profilewidth.search(line)
self.data.query_nseqs = int(m.group(1))
self.data.query_neffseqs = float(m.group(2))
self.data.hit_nseqs = int(m.group(3))
self.data.hit_neffseqs = float(m.group(4))
def scores(self, line):
m = self.__class__._re_scores.search(line)
if m:
self.data.sw_score = int(m.group(1))
self.data.evalue = float(m.group(2))
else:
self.data.sw_score = 0
self.data.evalue = -1.0
def query_alignment(self, line):
m = self.__class__._re_start.search(line)
if m:
self.data.query_start = int(m.group(1))
m = self.__class__._re_align.match(line)
assert m!=None, "invalid match"
self.data.query_aln = self.data.query_aln + m.group(1)
def positive_alignment(self,line):
m = self.__class__._re_positive_alignment.match(line)
assert m!=None, "invalid match"
self.data.positives = self.data.positives + m.group(1)
def hit_alignment(self,line):
m = self.__class__._re_start.search(line)
if m:
self.data.hit_start = int(m.group(1))
m = self.__class__._re_align.match(line)
assert m!=None, "invalid match"
self.data.hit_aln = self.data.hit_aln + m.group(1)
class RecordParser(AbstractParser):
"""Parses compass results into a Record object.
"""
def __init__(self):
self._scanner = _Scanner()
self._consumer = _Consumer()
def parse(self, handle):
if isinstance(handle, File.UndoHandle):
uhandle = handle
else:
uhandle = File.UndoHandle(handle)
self._scanner.feed(uhandle, self._consumer)
return self._consumer.data
class Iterator:
"""Iterate through a file of compass results"""
def __init__(self, handle):
self._uhandle = File.UndoHandle(handle)
self._parser = RecordParser()
def next(self):
lines = []
while 1:
line = self._uhandle.readline()
if not line:
break
if line[0:4] == "Ali1" and lines:
self._uhandle.saveline(line)
break
lines.append(line)
if not lines:
return None
data = string.join(lines, '')
return self._parser.parse(File.StringHandle(data))