mirror of
https://github.com/biopython/biopython.git
synced 2025-10-20 21:53:47 +08:00
$ ruff check --fix --select=I \ --config=lint.isort.force-single-line=true \ --config=lint.isort.order-by-type=false \ BioSQL/ Bio/ Tests/ Scripts/ Doc/ setup.py Using ruff version 0.4.10
844 lines
31 KiB
Python
844 lines
31 KiB
Python
# Copyright 2002 by Andrew Dalke. All rights reserved.
|
|
# Revisions 2007-2016 copyright by Peter Cock. All rights reserved.
|
|
# Revisions 2009 copyright by Cymon J. Cox. All rights reserved.
|
|
# Revisions 2013-2014 copyright by Tiago Antao. All rights reserved.
|
|
#
|
|
# This file is part of the Biopython distribution and governed by your
|
|
# choice of the "Biopython License Agreement" or the "BSD 3-Clause License".
|
|
# Please see the LICENSE file that should have been included as part of this
|
|
# package.
|
|
#
|
|
# Note that BioSQL (including the database schema and scripts) is
|
|
# available and licensed separately. Please consult www.biosql.org
|
|
"""Connect with a BioSQL database and load Biopython like objects from it.
|
|
|
|
This provides interfaces for loading biological objects from a relational
|
|
database, and is compatible with the BioSQL standards.
|
|
"""
|
|
|
|
import os
|
|
|
|
from . import BioSeq
|
|
from . import DBUtils
|
|
from . import Loader
|
|
|
|
_POSTGRES_RULES_PRESENT = False # Hack for BioSQL Bug 2839
|
|
|
|
|
|
def open_database(driver="MySQLdb", **kwargs):
|
|
"""Load an existing BioSQL-style database.
|
|
|
|
This function is the easiest way to retrieve a connection to a
|
|
database, doing something like::
|
|
|
|
from BioSQL import BioSeqDatabase
|
|
server = BioSeqDatabase.open_database(user="root", db="minidb")
|
|
|
|
Arguments:
|
|
- driver - The name of the database driver to use for connecting. The
|
|
driver should implement the python DB API. By default, the MySQLdb
|
|
driver is used.
|
|
- user -the username to connect to the database with.
|
|
- password, passwd - the password to connect with
|
|
- host - the hostname of the database
|
|
- database or db - the name of the database
|
|
|
|
"""
|
|
if driver == "psycopg":
|
|
raise ValueError(
|
|
"Using BioSQL with psycopg (version one) is no "
|
|
"longer supported. Use psycopg2 instead."
|
|
)
|
|
|
|
if os.name == "java":
|
|
from com.ziclix.python.sql import zxJDBC
|
|
|
|
module = zxJDBC
|
|
if driver in ["MySQLdb"]:
|
|
jdbc_driver = "com.mysql.jdbc.Driver"
|
|
url_pref = "jdbc:mysql://" + kwargs["host"] + "/"
|
|
elif driver in ["psycopg2"]:
|
|
jdbc_driver = "org.postgresql.Driver"
|
|
url_pref = "jdbc:postgresql://" + kwargs["host"] + "/"
|
|
|
|
else:
|
|
module = __import__(driver, fromlist=["connect"])
|
|
connect = module.connect
|
|
|
|
# Different drivers use different keywords...
|
|
kw = kwargs.copy()
|
|
if driver in ["MySQLdb", "mysql.connector"] and os.name != "java":
|
|
if "database" in kw:
|
|
kw["db"] = kw["database"]
|
|
del kw["database"]
|
|
if "password" in kw:
|
|
kw["passwd"] = kw["password"]
|
|
del kw["password"]
|
|
# kw["charset"] = "utf8"
|
|
# kw["use_unicode"] = True
|
|
else:
|
|
# DB-API recommendations
|
|
if "db" in kw:
|
|
kw["database"] = kw["db"]
|
|
del kw["db"]
|
|
if "passwd" in kw:
|
|
kw["password"] = kw["passwd"]
|
|
del kw["passwd"]
|
|
if driver in ["psycopg2", "pgdb"] and not kw.get("database"):
|
|
kw["database"] = "template1"
|
|
# SQLite connect takes the database name as input
|
|
if os.name == "java":
|
|
if driver in ["MySQLdb"]:
|
|
conn = connect(
|
|
url_pref + kw.get("database", "mysql"),
|
|
kw["user"],
|
|
kw["password"],
|
|
jdbc_driver,
|
|
)
|
|
elif driver in ["psycopg2"]:
|
|
conn = connect(
|
|
url_pref + kw.get("database", "postgresql") + "?stringtype=unspecified",
|
|
kw["user"],
|
|
kw["password"],
|
|
jdbc_driver,
|
|
)
|
|
elif driver in ["sqlite3"]:
|
|
conn = connect(kw["database"])
|
|
else:
|
|
conn = connect(**kw)
|
|
|
|
if os.name == "java":
|
|
server = DBServer(conn, module, driver)
|
|
else:
|
|
server = DBServer(conn, module)
|
|
|
|
# Sets MySQL to allow double quotes, rather than only backticks
|
|
if driver in ["MySQLdb", "mysql.connector"]:
|
|
server.adaptor.execute("SET sql_mode='ANSI_QUOTES';")
|
|
|
|
# TODO - Remove the following once BioSQL Bug 2839 is fixed.
|
|
# Test for RULES in PostgreSQL schema, see also Bug 2833.
|
|
if driver in ["psycopg2", "pgdb"]:
|
|
sql = (
|
|
"SELECT ev_class FROM pg_rewrite WHERE "
|
|
"rulename='rule_bioentry_i1' OR "
|
|
"rulename='rule_bioentry_i2';"
|
|
)
|
|
if server.adaptor.execute_and_fetchall(sql):
|
|
import warnings
|
|
|
|
from Bio import BiopythonWarning
|
|
|
|
warnings.warn(
|
|
"Your BioSQL PostgreSQL schema includes some rules "
|
|
"currently required for bioperl-db but which may"
|
|
"cause problems loading data using Biopython (see "
|
|
"BioSQL's RedMine Bug 2839 aka GitHub Issue 4 "
|
|
"https://github.com/biosql/biosql/issues/4). "
|
|
"If you do not use BioPerl, please remove these "
|
|
"rules. Biopython should cope with the rules "
|
|
"present, but with a performance penalty when "
|
|
"loading new records.",
|
|
BiopythonWarning,
|
|
)
|
|
global _POSTGRES_RULES_PRESENT
|
|
_POSTGRES_RULES_PRESENT = True
|
|
|
|
elif driver == "sqlite3":
|
|
# Tell SQLite that we want to use foreign keys
|
|
# https://www.sqlite.org/foreignkeys.html#fk_enable
|
|
server.adaptor.execute("PRAGMA foreign_keys = ON")
|
|
|
|
return server
|
|
|
|
|
|
class DBServer:
|
|
"""Represents a BioSQL database containing namespaces (sub-databases).
|
|
|
|
This acts like a Python dictionary, giving access to each namespace
|
|
(defined by a row in the biodatabase table) as a BioSeqDatabase object.
|
|
"""
|
|
|
|
def __init__(self, conn, module, module_name=None):
|
|
"""Create a DBServer object.
|
|
|
|
Arguments:
|
|
- conn - A database connection object
|
|
- module - The module used to create the database connection
|
|
- module_name - Optionally, the name of the module. Default: module.__name__
|
|
|
|
Normally you would not want to create a DBServer object yourself.
|
|
Instead use the open_database function, which returns an instance of DBServer.
|
|
"""
|
|
self.module = module
|
|
if module_name is None:
|
|
module_name = module.__name__
|
|
if module_name == "mysql.connector":
|
|
wrap_cursor = True
|
|
else:
|
|
wrap_cursor = False
|
|
# Get module specific Adaptor or the base (general) Adaptor
|
|
Adapt = _interface_specific_adaptors.get(module_name, Adaptor)
|
|
self.adaptor = Adapt(
|
|
conn, DBUtils.get_dbutils(module_name), wrap_cursor=wrap_cursor
|
|
)
|
|
self.module_name = module_name
|
|
|
|
def __repr__(self):
|
|
"""Return a short description of the class name and database connection."""
|
|
return f"{self.__class__.__name__}({self.adaptor.conn!r})"
|
|
|
|
def __getitem__(self, name):
|
|
"""Return a BioSeqDatabase object.
|
|
|
|
Arguments:
|
|
- name - The name of the BioSeqDatabase
|
|
|
|
"""
|
|
return BioSeqDatabase(self.adaptor, name)
|
|
|
|
def __len__(self):
|
|
"""Return number of namespaces (sub-databases) in this database."""
|
|
sql = "SELECT COUNT(name) FROM biodatabase;"
|
|
return int(self.adaptor.execute_and_fetch_col0(sql)[0])
|
|
|
|
def __contains__(self, value):
|
|
"""Check if a namespace (sub-database) in this database."""
|
|
sql = "SELECT COUNT(name) FROM biodatabase WHERE name=%s;"
|
|
return bool(self.adaptor.execute_and_fetch_col0(sql, (value,))[0])
|
|
|
|
def __iter__(self):
|
|
"""Iterate over namespaces (sub-databases) in the database."""
|
|
# TODO - Iterate over the cursor, much more efficient
|
|
return iter(self.adaptor.list_biodatabase_names())
|
|
|
|
def keys(self):
|
|
"""Iterate over namespaces (sub-databases) in the database."""
|
|
return iter(self)
|
|
|
|
def values(self):
|
|
"""Iterate over BioSeqDatabase objects in the database."""
|
|
for key in self:
|
|
yield self[key]
|
|
|
|
def items(self):
|
|
"""Iterate over (namespace, BioSeqDatabase) in the database."""
|
|
for key in self:
|
|
yield key, self[key]
|
|
|
|
def __delitem__(self, name):
|
|
"""Remove a namespace and all its entries."""
|
|
if name not in self:
|
|
raise KeyError(name)
|
|
db_id = self.adaptor.fetch_dbid_by_dbname(name)
|
|
remover = Loader.DatabaseRemover(self.adaptor, db_id)
|
|
remover.remove()
|
|
|
|
def new_database(self, db_name, authority=None, description=None):
|
|
"""Add a new database to the server and return it."""
|
|
# make the database
|
|
sql = (
|
|
"INSERT INTO biodatabase (name, authority, description)"
|
|
" VALUES (%s, %s, %s)"
|
|
)
|
|
self.adaptor.execute(sql, (db_name, authority, description))
|
|
return BioSeqDatabase(self.adaptor, db_name)
|
|
|
|
def load_database_sql(self, sql_file):
|
|
"""Load a database schema into the given database.
|
|
|
|
This is used to create tables, etc when a database is first created.
|
|
sql_file should specify the complete path to a file containing
|
|
SQL entries for building the tables.
|
|
"""
|
|
# Not sophisticated enough for PG schema. Is it needed by MySQL?
|
|
# Looks like we need this more complicated way for both. Leaving it
|
|
# the default and removing the simple-minded approach.
|
|
|
|
# read the file with all comment lines removed
|
|
sql = ""
|
|
with open(sql_file) as sql_handle:
|
|
for line in sql_handle:
|
|
if line.startswith("--"): # don't include comment lines
|
|
pass
|
|
elif line.startswith("#"): # ditto for MySQL comments
|
|
pass
|
|
elif line.strip(): # only include non-blank lines
|
|
sql += line.strip() + " "
|
|
|
|
# two ways to load the SQL
|
|
# 1. PostgreSQL can load it all at once and actually needs to
|
|
# due to FUNCTION defines at the end of the SQL which mess up
|
|
# the splitting by semicolons
|
|
if self.module_name in ["psycopg2", "pgdb"]:
|
|
self.adaptor.cursor.execute(sql)
|
|
# 2. MySQL needs the database loading split up into single lines of
|
|
# SQL executed one at a time
|
|
elif self.module_name in ["mysql.connector", "MySQLdb", "sqlite3"]:
|
|
sql_parts = sql.split(";") # one line per sql command
|
|
# don't use the last item, it's blank
|
|
for sql_line in sql_parts[:-1]:
|
|
self.adaptor.cursor.execute(sql_line)
|
|
else:
|
|
raise ValueError(f"Module {self.module_name} not supported by the loader.")
|
|
|
|
def commit(self):
|
|
"""Commit the current transaction to the database."""
|
|
return self.adaptor.commit()
|
|
|
|
def rollback(self):
|
|
"""Roll-back the current transaction."""
|
|
return self.adaptor.rollback()
|
|
|
|
def close(self):
|
|
"""Close the connection. No further activity possible."""
|
|
return self.adaptor.close()
|
|
|
|
|
|
class _CursorWrapper:
|
|
"""A wrapper for mysql.connector resolving bytestring representations."""
|
|
|
|
def __init__(self, real_cursor):
|
|
self.real_cursor = real_cursor
|
|
|
|
def execute(self, operation, params=None, multi=False):
|
|
"""Execute a sql statement."""
|
|
self.real_cursor.execute(operation, params, multi)
|
|
|
|
def executemany(self, operation, params):
|
|
"""Execute many sql statements."""
|
|
self.real_cursor.executemany(operation, params)
|
|
|
|
def _convert_tuple(self, tuple_):
|
|
"""Decode any bytestrings present in the row (PRIVATE)."""
|
|
tuple_list = list(tuple_)
|
|
for i, elem in enumerate(tuple_list):
|
|
if isinstance(elem, bytes):
|
|
tuple_list[i] = elem.decode("utf-8")
|
|
return tuple(tuple_list)
|
|
|
|
def _convert_list(self, lst):
|
|
ret_lst = []
|
|
for tuple_ in lst:
|
|
new_tuple = self._convert_tuple(tuple_)
|
|
ret_lst.append(new_tuple)
|
|
return ret_lst
|
|
|
|
def fetchall(self):
|
|
rv = self.real_cursor.fetchall()
|
|
return self._convert_list(rv)
|
|
|
|
def fetchone(self):
|
|
tuple_ = self.real_cursor.fetchone()
|
|
return self._convert_tuple(tuple_)
|
|
|
|
|
|
class Adaptor:
|
|
"""High level wrapper for a database connection and cursor.
|
|
|
|
Most database calls in BioSQL are done indirectly though this adaptor
|
|
class. This provides helper methods for fetching data and executing
|
|
sql.
|
|
"""
|
|
|
|
def __init__(self, conn, dbutils, wrap_cursor=False):
|
|
"""Create an Adaptor object.
|
|
|
|
Arguments:
|
|
- conn - A database connection
|
|
- dbutils - A BioSQL.DBUtils object
|
|
- wrap_cursor - Optional, whether to wrap the cursor object
|
|
|
|
"""
|
|
self.conn = conn
|
|
if wrap_cursor:
|
|
self.cursor = _CursorWrapper(conn.cursor())
|
|
else:
|
|
self.cursor = conn.cursor()
|
|
self.dbutils = dbutils
|
|
|
|
def last_id(self, table):
|
|
"""Return the last row id for the selected table."""
|
|
return self.dbutils.last_id(self.cursor, table)
|
|
|
|
def autocommit(self, y=True):
|
|
"""Set the autocommit mode. True values enable; False value disable."""
|
|
return self.dbutils.autocommit(self.conn, y)
|
|
|
|
def commit(self):
|
|
"""Commit the current transaction."""
|
|
return self.conn.commit()
|
|
|
|
def rollback(self):
|
|
"""Roll-back the current transaction."""
|
|
return self.conn.rollback()
|
|
|
|
def close(self):
|
|
"""Close the connection. No further activity possible."""
|
|
return self.conn.close()
|
|
|
|
def fetch_dbid_by_dbname(self, dbname):
|
|
"""Return the internal id for the sub-database using its name."""
|
|
self.execute(
|
|
"select biodatabase_id from biodatabase where name = %s", (dbname,)
|
|
)
|
|
rv = self.cursor.fetchall()
|
|
if not rv:
|
|
raise KeyError(f"Cannot find biodatabase with name {dbname!r}")
|
|
return rv[0][0]
|
|
|
|
def fetch_seqid_by_display_id(self, dbid, name):
|
|
"""Return the internal id for a sequence using its display id.
|
|
|
|
Arguments:
|
|
- dbid - the internal id for the sub-database
|
|
- name - the name of the sequence. Corresponds to the
|
|
name column of the bioentry table of the SQL schema
|
|
|
|
"""
|
|
sql = "select bioentry_id from bioentry where name = %s"
|
|
fields = [name]
|
|
if dbid:
|
|
sql += " and biodatabase_id = %s"
|
|
fields.append(dbid)
|
|
self.execute(sql, fields)
|
|
rv = self.cursor.fetchall()
|
|
if not rv:
|
|
raise IndexError(f"Cannot find display id {name!r}")
|
|
if len(rv) > 1:
|
|
raise IndexError(f"More than one entry with display id {name!r}")
|
|
return rv[0][0]
|
|
|
|
def fetch_seqid_by_accession(self, dbid, name):
|
|
"""Return the internal id for a sequence using its accession.
|
|
|
|
Arguments:
|
|
- dbid - the internal id for the sub-database
|
|
- name - the accession of the sequence. Corresponds to the
|
|
accession column of the bioentry table of the SQL schema
|
|
|
|
"""
|
|
sql = "select bioentry_id from bioentry where accession = %s"
|
|
fields = [name]
|
|
if dbid:
|
|
sql += " and biodatabase_id = %s"
|
|
fields.append(dbid)
|
|
self.execute(sql, fields)
|
|
rv = self.cursor.fetchall()
|
|
if not rv:
|
|
raise IndexError(f"Cannot find accession {name!r}")
|
|
if len(rv) > 1:
|
|
raise IndexError(f"More than one entry with accession {name!r}")
|
|
return rv[0][0]
|
|
|
|
def fetch_seqids_by_accession(self, dbid, name):
|
|
"""Return a list internal ids using an accession.
|
|
|
|
Arguments:
|
|
- dbid - the internal id for the sub-database
|
|
- name - the accession of the sequence. Corresponds to the
|
|
accession column of the bioentry table of the SQL schema
|
|
|
|
"""
|
|
sql = "select bioentry_id from bioentry where accession = %s"
|
|
fields = [name]
|
|
if dbid:
|
|
sql += " and biodatabase_id = %s"
|
|
fields.append(dbid)
|
|
return self.execute_and_fetch_col0(sql, fields)
|
|
|
|
def fetch_seqid_by_version(self, dbid, name):
|
|
"""Return the internal id for a sequence using its accession and version.
|
|
|
|
Arguments:
|
|
- dbid - the internal id for the sub-database
|
|
- name - the accession of the sequence containing a version number.
|
|
Must correspond to <accession>.<version>
|
|
|
|
"""
|
|
acc_version = name.split(".")
|
|
if len(acc_version) > 2:
|
|
raise IndexError(f"Bad version {name!r}")
|
|
acc = acc_version[0]
|
|
if len(acc_version) == 2:
|
|
version = acc_version[1]
|
|
else:
|
|
version = "0"
|
|
sql = "SELECT bioentry_id FROM bioentry WHERE accession = %s AND version = %s"
|
|
fields = [acc, version]
|
|
if dbid:
|
|
sql += " and biodatabase_id = %s"
|
|
fields.append(dbid)
|
|
self.execute(sql, fields)
|
|
rv = self.cursor.fetchall()
|
|
if not rv:
|
|
raise IndexError(f"Cannot find version {name!r}")
|
|
if len(rv) > 1:
|
|
raise IndexError(f"More than one entry with version {name!r}")
|
|
return rv[0][0]
|
|
|
|
def fetch_seqid_by_identifier(self, dbid, identifier):
|
|
"""Return the internal id for a sequence using its identifier.
|
|
|
|
Arguments:
|
|
- dbid - the internal id for the sub-database
|
|
- identifier - the identifier of the sequence. Corresponds to
|
|
the identifier column of the bioentry table in the SQL schema.
|
|
|
|
"""
|
|
# YB: was fetch_seqid_by_seqid
|
|
sql = "SELECT bioentry_id FROM bioentry WHERE identifier = %s"
|
|
fields = [identifier]
|
|
if dbid:
|
|
sql += " and biodatabase_id = %s"
|
|
fields.append(dbid)
|
|
self.execute(sql, fields)
|
|
rv = self.cursor.fetchall()
|
|
if not rv:
|
|
raise IndexError(f"Cannot find display id {identifier!r}")
|
|
return rv[0][0]
|
|
|
|
def list_biodatabase_names(self):
|
|
"""Return a list of all of the sub-databases."""
|
|
return self.execute_and_fetch_col0("SELECT name FROM biodatabase")
|
|
|
|
def list_bioentry_ids(self, dbid):
|
|
"""Return a list of internal ids for all of the sequences in a sub-databae.
|
|
|
|
Arguments:
|
|
- dbid - The internal id for a sub-database
|
|
|
|
"""
|
|
return self.execute_and_fetch_col0(
|
|
"SELECT bioentry_id FROM bioentry WHERE biodatabase_id = %s", (dbid,)
|
|
)
|
|
|
|
def list_bioentry_display_ids(self, dbid):
|
|
"""Return a list of all sequence names in a sub-databae.
|
|
|
|
Arguments:
|
|
- dbid - The internal id for a sub-database
|
|
|
|
"""
|
|
return self.execute_and_fetch_col0(
|
|
"SELECT name FROM bioentry WHERE biodatabase_id = %s", (dbid,)
|
|
)
|
|
|
|
def list_any_ids(self, sql, args):
|
|
"""Return ids given a SQL statement to select for them.
|
|
|
|
This assumes that the given SQL does a SELECT statement that
|
|
returns a list of items. This parses them out of the 2D list
|
|
they come as and just returns them in a list.
|
|
"""
|
|
return self.execute_and_fetch_col0(sql, args)
|
|
|
|
def execute_one(self, sql, args=None):
|
|
"""Execute sql that returns 1 record, and return the record."""
|
|
self.execute(sql, args or ())
|
|
rv = self.cursor.fetchall()
|
|
if len(rv) != 1:
|
|
raise ValueError(f"Expected 1 response, got {len(rv)}.")
|
|
return rv[0]
|
|
|
|
def execute(self, sql, args=None):
|
|
"""Just execute an sql command."""
|
|
if os.name == "java":
|
|
sql = sql.replace("%s", "?")
|
|
self.dbutils.execute(self.cursor, sql, args)
|
|
|
|
def executemany(self, sql, args):
|
|
"""Execute many sql commands."""
|
|
if os.name == "java":
|
|
sql = sql.replace("%s", "?")
|
|
self.dbutils.executemany(self.cursor, sql, args)
|
|
|
|
def get_subseq_as_string(self, seqid, start, end):
|
|
"""Return a substring of a sequence.
|
|
|
|
Arguments:
|
|
- seqid - The internal id for the sequence
|
|
- start - The start position of the sequence; 0-indexed
|
|
- end - The end position of the sequence
|
|
|
|
"""
|
|
length = end - start
|
|
# XXX Check this on MySQL and PostgreSQL. substr should be general,
|
|
# does it need dbutils?
|
|
# return self.execute_one(
|
|
# """select SUBSTRING(seq FROM %s FOR %s)
|
|
# from biosequence where bioentry_id = %s""",
|
|
# (start+1, length, seqid))[0]
|
|
return self.execute_one(
|
|
"SELECT SUBSTR(seq, %s, %s) FROM biosequence WHERE bioentry_id = %s",
|
|
(start + 1, length, seqid),
|
|
)[0]
|
|
|
|
def execute_and_fetch_col0(self, sql, args=None):
|
|
"""Return a list of values from the first column in the row."""
|
|
self.execute(sql, args or ())
|
|
return [field[0] for field in self.cursor.fetchall()]
|
|
|
|
def execute_and_fetchall(self, sql, args=None):
|
|
"""Return a list of tuples of all rows."""
|
|
self.execute(sql, args or ())
|
|
return self.cursor.fetchall()
|
|
|
|
|
|
class MysqlConnectorAdaptor(Adaptor):
|
|
"""A BioSQL Adaptor class with fixes for the MySQL interface.
|
|
|
|
BioSQL was failing due to returns of bytearray objects from
|
|
the mysql-connector-python database connector. This adaptor
|
|
class scrubs returns of bytearrays and of byte strings converting
|
|
them to string objects instead. This adaptor class was made in
|
|
response to backwards incompatible changes added to
|
|
mysql-connector-python in release 2.0.0 of the package.
|
|
"""
|
|
|
|
@staticmethod
|
|
def _bytearray_to_str(s):
|
|
"""If s is bytes or bytearray, convert to a string (PRIVATE)."""
|
|
if isinstance(s, (bytes, bytearray)):
|
|
return s.decode()
|
|
return s
|
|
|
|
def execute_one(self, sql, args=None):
|
|
"""Execute sql that returns 1 record, and return the record."""
|
|
out = super().execute_one(sql, args)
|
|
return tuple(self._bytearray_to_str(v) for v in out)
|
|
|
|
def execute_and_fetch_col0(self, sql, args=None):
|
|
"""Return a list of values from the first column in the row."""
|
|
out = super().execute_and_fetch_col0(sql, args)
|
|
return [self._bytearray_to_str(column) for column in out]
|
|
|
|
def execute_and_fetchall(self, sql, args=None):
|
|
"""Return a list of tuples of all rows."""
|
|
out = super().execute_and_fetchall(sql, args)
|
|
return [tuple(self._bytearray_to_str(v) for v in o) for o in out]
|
|
|
|
|
|
_interface_specific_adaptors = {
|
|
# If SQL interfaces require a specific adaptor, use this to map the adaptor
|
|
"mysql.connector": MysqlConnectorAdaptor,
|
|
"MySQLdb": MysqlConnectorAdaptor,
|
|
}
|
|
|
|
_allowed_lookups = {
|
|
# Lookup name / function name to get id, function to list all ids
|
|
"primary_id": "fetch_seqid_by_identifier",
|
|
"gi": "fetch_seqid_by_identifier",
|
|
"display_id": "fetch_seqid_by_display_id",
|
|
"name": "fetch_seqid_by_display_id",
|
|
"accession": "fetch_seqid_by_accession",
|
|
"version": "fetch_seqid_by_version",
|
|
}
|
|
|
|
|
|
class BioSeqDatabase:
|
|
"""Represents a namespace (sub-database) within the BioSQL database.
|
|
|
|
i.e. One row in the biodatabase table, and all all rows in the bioentry
|
|
table associated with it.
|
|
"""
|
|
|
|
def __init__(self, adaptor, name):
|
|
"""Create a BioDatabase object.
|
|
|
|
Arguments:
|
|
- adaptor - A BioSQL.Adaptor object
|
|
- name - The name of the sub-database (namespace)
|
|
|
|
"""
|
|
self.adaptor = adaptor
|
|
self.name = name
|
|
self.dbid = self.adaptor.fetch_dbid_by_dbname(name)
|
|
|
|
def __repr__(self):
|
|
"""Return a short summary of the BioSeqDatabase."""
|
|
return f"BioSeqDatabase({self.adaptor!r}, {self.name!r})"
|
|
|
|
def get_Seq_by_id(self, name):
|
|
"""Get a DBSeqRecord object by its name.
|
|
|
|
Example: seq_rec = db.get_Seq_by_id('ROA1_HUMAN')
|
|
|
|
The name of this method is misleading since it returns a DBSeqRecord
|
|
rather than a Seq object, and presumably was to mirror BioPerl.
|
|
"""
|
|
seqid = self.adaptor.fetch_seqid_by_display_id(self.dbid, name)
|
|
return BioSeq.DBSeqRecord(self.adaptor, seqid)
|
|
|
|
def get_Seq_by_acc(self, name):
|
|
"""Get a DBSeqRecord object by accession number.
|
|
|
|
Example: seq_rec = db.get_Seq_by_acc('X77802')
|
|
|
|
The name of this method is misleading since it returns a DBSeqRecord
|
|
rather than a Seq object, and presumably was to mirror BioPerl.
|
|
"""
|
|
seqid = self.adaptor.fetch_seqid_by_accession(self.dbid, name)
|
|
return BioSeq.DBSeqRecord(self.adaptor, seqid)
|
|
|
|
def get_Seq_by_ver(self, name):
|
|
"""Get a DBSeqRecord object by version number.
|
|
|
|
Example: seq_rec = db.get_Seq_by_ver('X77802.1')
|
|
|
|
The name of this method is misleading since it returns a DBSeqRecord
|
|
rather than a Seq object, and presumably was to mirror BioPerl.
|
|
"""
|
|
seqid = self.adaptor.fetch_seqid_by_version(self.dbid, name)
|
|
return BioSeq.DBSeqRecord(self.adaptor, seqid)
|
|
|
|
def get_Seqs_by_acc(self, name):
|
|
"""Get a list of DBSeqRecord objects by accession number.
|
|
|
|
Example: seq_recs = db.get_Seq_by_acc('X77802')
|
|
|
|
The name of this method is misleading since it returns a list of
|
|
DBSeqRecord objects rather than a list of Seq objects, and presumably
|
|
was to mirror BioPerl.
|
|
"""
|
|
seqids = self.adaptor.fetch_seqids_by_accession(self.dbid, name)
|
|
return [BioSeq.DBSeqRecord(self.adaptor, seqid) for seqid in seqids]
|
|
|
|
def __getitem__(self, key):
|
|
"""Return a DBSeqRecord for one of the sequences in the sub-database.
|
|
|
|
Arguments:
|
|
- key - The internal id for the sequence
|
|
|
|
"""
|
|
record = BioSeq.DBSeqRecord(self.adaptor, key)
|
|
if record._biodatabase_id != self.dbid:
|
|
raise KeyError(f"Entry {key!r} does exist, but not in current name space")
|
|
return record
|
|
|
|
def __delitem__(self, key):
|
|
"""Remove an entry and all its annotation."""
|
|
if key not in self:
|
|
raise KeyError(
|
|
f"Entry {key!r} cannot be deleted. It was not found or is invalid"
|
|
)
|
|
# Assuming this will automatically cascade to the other tables...
|
|
sql = "DELETE FROM bioentry WHERE biodatabase_id=%s AND bioentry_id=%s;"
|
|
self.adaptor.execute(sql, (self.dbid, key))
|
|
|
|
def __len__(self):
|
|
"""Return number of records in this namespace (sub database)."""
|
|
sql = "SELECT COUNT(bioentry_id) FROM bioentry WHERE biodatabase_id=%s;"
|
|
return int(self.adaptor.execute_and_fetch_col0(sql, (self.dbid,))[0])
|
|
|
|
def __contains__(self, value):
|
|
"""Check if a primary (internal) id is this namespace (sub database)."""
|
|
sql = (
|
|
"SELECT COUNT(bioentry_id) FROM bioentry "
|
|
"WHERE biodatabase_id=%s AND bioentry_id=%s;"
|
|
)
|
|
# The bioentry_id field is an integer in the schema.
|
|
# PostgreSQL will throw an error if we use a non integer in the query.
|
|
try:
|
|
bioentry_id = int(value)
|
|
except ValueError:
|
|
return False
|
|
return bool(
|
|
self.adaptor.execute_and_fetch_col0(sql, (self.dbid, bioentry_id))[0]
|
|
)
|
|
|
|
def __iter__(self):
|
|
"""Iterate over ids (which may not be meaningful outside this database)."""
|
|
# TODO - Iterate over the cursor, much more efficient
|
|
return iter(self.adaptor.list_bioentry_ids(self.dbid))
|
|
|
|
def keys(self):
|
|
"""Iterate over ids (which may not be meaningful outside this database)."""
|
|
return iter(self)
|
|
|
|
def values(self):
|
|
"""Iterate over DBSeqRecord objects in the namespace (sub database)."""
|
|
for key in self:
|
|
yield self[key]
|
|
|
|
def items(self):
|
|
"""Iterate over (id, DBSeqRecord) for the namespace (sub database)."""
|
|
for key in self:
|
|
yield key, self[key]
|
|
|
|
def lookup(self, **kwargs):
|
|
"""Return a DBSeqRecord using an acceptable identifier.
|
|
|
|
Arguments:
|
|
- kwargs - A single key-value pair where the key is one
|
|
of primary_id, gi, display_id, name, accession, version
|
|
|
|
"""
|
|
if len(kwargs) != 1:
|
|
raise TypeError("single key/value parameter expected")
|
|
k, v = list(kwargs.items())[0]
|
|
if k not in _allowed_lookups:
|
|
raise TypeError(
|
|
f"lookup() expects one of {list(_allowed_lookups.keys())!r}, not {k!r}"
|
|
)
|
|
lookup_name = _allowed_lookups[k]
|
|
lookup_func = getattr(self.adaptor, lookup_name)
|
|
seqid = lookup_func(self.dbid, v)
|
|
return BioSeq.DBSeqRecord(self.adaptor, seqid)
|
|
|
|
def load(self, record_iterator, fetch_NCBI_taxonomy=False):
|
|
"""Load a set of SeqRecords into the BioSQL database.
|
|
|
|
record_iterator is either a list of SeqRecord objects, or an
|
|
Iterator object that returns SeqRecord objects (such as the
|
|
output from the Bio.SeqIO.parse() function), which will be
|
|
used to populate the database.
|
|
|
|
fetch_NCBI_taxonomy is boolean flag allowing or preventing
|
|
connection to the taxonomic database on the NCBI server
|
|
(via Bio.Entrez) to fetch a detailed taxonomy for each
|
|
SeqRecord.
|
|
|
|
Example::
|
|
|
|
from Bio import SeqIO
|
|
count = db.load(SeqIO.parse(open(filename), format))
|
|
|
|
Returns the number of records loaded.
|
|
"""
|
|
db_loader = Loader.DatabaseLoader(self.adaptor, self.dbid, fetch_NCBI_taxonomy)
|
|
num_records = 0
|
|
global _POSTGRES_RULES_PRESENT
|
|
for cur_record in record_iterator:
|
|
num_records += 1
|
|
# Hack to work around BioSQL Bug 2839 - If using PostgreSQL and
|
|
# the RULES are present check for a duplicate record before loading
|
|
if _POSTGRES_RULES_PRESENT:
|
|
# Recreate what the Loader's _load_bioentry_table will do:
|
|
if cur_record.id.count(".") == 1:
|
|
accession, version = cur_record.id.split(".")
|
|
try:
|
|
version = int(version)
|
|
except ValueError:
|
|
accession = cur_record.id
|
|
version = 0
|
|
else:
|
|
accession = cur_record.id
|
|
version = 0
|
|
gi = cur_record.annotations.get("gi")
|
|
sql = (
|
|
"SELECT bioentry_id FROM bioentry "
|
|
"WHERE (identifier = '%s' AND biodatabase_id = '%s') "
|
|
"OR (accession = '%s' AND version = '%s' AND biodatabase_id = '%s')"
|
|
)
|
|
self.adaptor.execute(
|
|
sql % (gi, self.dbid, accession, version, self.dbid)
|
|
)
|
|
if self.adaptor.cursor.fetchone():
|
|
raise self.adaptor.conn.IntegrityError(
|
|
"Duplicate record detected: record has not been inserted"
|
|
)
|
|
# End of hack
|
|
db_loader.load_seqrecord(cur_record)
|
|
return num_records
|