MST

星途 面试题库

面试题:Python数据库连接池异常处理与设计

设计一个健壮的Python数据库连接池,要求能够优雅地处理数据库连接超时、连接中断、死锁等异常情况。描述整体设计思路,包括如何检测异常、如何恢复连接以及如何避免死锁的发生,并给出关键代码片段示例。
45.2万 热度难度
编程语言Python

知识考点

AI 面试

面试题答案

一键面试

设计思路

  1. 连接池管理:使用一个队列来管理数据库连接对象。初始化连接池时,创建一定数量的连接并放入队列。
  2. 异常检测
    • 连接超时:在获取连接时,设置获取连接的超时时间。若在规定时间内未能从队列获取连接,则抛出超时异常。
    • 连接中断:在每次从连接池获取连接后,执行一个简单的数据库查询(如 SELECT 1)来检测连接是否可用。若查询失败,捕获相应异常,标记该连接不可用。
    • 死锁检测:数据库本身一般有死锁检测机制,Python 层面可以通过捕获数据库操作抛出的死锁相关异常(如 psycopg2 库中 psycopg2.extensions.TransactionRollbackError 异常在某些情况下可能表示死锁)来检测。
  3. 恢复连接
    • 连接中断:当检测到连接中断后,将该连接从连接池中移除,关闭连接对象,并重新创建一个新的连接放入连接池。
    • 死锁:捕获到死锁异常后,回滚当前事务(如果有事务正在进行),关闭当前连接(可选,具体看数据库驱动和业务需求),从连接池重新获取连接并尝试重新执行操作。
  4. 避免死锁
    • 事务顺序:确保在不同业务逻辑中,对数据库资源的访问顺序一致。例如,若涉及多个表的操作,在所有业务逻辑中都按照相同的表顺序进行操作。
    • 锁粒度:尽量使用较低粒度的锁。如使用行级锁而非表级锁,减少锁冲突的范围。
    • 事务超时:设置合理的事务超时时间,若事务执行时间过长,自动回滚事务,避免长时间持有锁。

关键代码示例(以 psycopg2 库连接 PostgreSQL 数据库为例)

import queue
import threading
import psycopg2
import time


class DatabaseConnectionPool:
    def __init__(self, host, port, database, user, password, pool_size=5, timeout=10):
        self.host = host
        self.port = port
        self.database = database
        self.user = user
        self.password = password
        self.pool_size = pool_size
        self.timeout = timeout
        self.pool = queue.Queue(maxsize=pool_size)
        self._init_pool()

    def _init_pool(self):
        for _ in range(self.pool_size):
            connection = self._create_connection()
            self.pool.put(connection)

    def _create_connection(self):
        try:
            return psycopg2.connect(
                host=self.host,
                port=self.port,
                database=self.database,
                user=self.user,
                password=self.password
            )
        except psycopg2.Error as e:
            print(f"Failed to create connection: {e}")
            return None

    def get_connection(self):
        try:
            return self.pool.get(timeout=self.timeout)
        except queue.Empty:
            raise TimeoutError("Failed to get connection from pool within timeout.")

    def return_connection(self, connection):
        try:
            # 简单检测连接是否可用
            cursor = connection.cursor()
            cursor.execute("SELECT 1")
            cursor.close()
            self.pool.put(connection)
        except psycopg2.Error as e:
            print(f"Connection seems broken, recreating: {e}")
            self.pool.put(self._create_connection())


def example_usage():
    pool = DatabaseConnectionPool(
        host='localhost',
        port=5432,
        database='test_db',
        user='test_user',
        password='test_password'
    )
    connection = pool.get_connection()
    try:
        cursor = connection.cursor()
        try:
            cursor.execute("SELECT * FROM some_table")
            rows = cursor.fetchall()
            for row in rows:
                print(row)
        except psycopg2.Error as e:
            if isinstance(e, psycopg2.extensions.TransactionRollbackError):
                print("Possible deadlock detected, rolling back and retrying...")
                connection.rollback()
                # 这里可以选择关闭当前连接并重新获取
                new_connection = pool.get_connection()
                new_cursor = new_connection.cursor()
                new_cursor.execute("SELECT * FROM some_table")
                new_rows = new_cursor.fetchall()
                for row in new_rows:
                    print(row)
                new_cursor.close()
                pool.return_connection(new_connection)
            else:
                print(f"Database operation error: {e}")
        cursor.close()
    finally:
        pool.return_connection(connection)


if __name__ == "__main__":
    example_usage()

上述代码实现了一个简单的 PostgreSQL 数据库连接池,包含获取连接、检测连接可用性、处理死锁异常等功能。在实际应用中,可根据具体数据库和业务需求进行进一步优化和扩展。