MST
星途 面试题库

面试题:自定义Python数据库连接池的设计与实现

假设现有的Python数据库连接池库无法满足项目特定需求,需要你自定义一个数据库连接池类来连接SQLite数据库。要求该连接池支持动态调整连接数量、连接超时设置、连接复用及连接状态监控功能。请详细描述设计思路,并给出关键代码实现。
39.6万 热度难度
编程语言Python

知识考点

AI 面试

面试题答案

一键面试

设计思路

  1. 连接池管理:使用一个列表来存储数据库连接对象,同时记录当前已使用的连接数量和最大连接数量。
  2. 动态调整连接数量:提供方法来增加或减少最大连接数,当连接不足且未达到最大连接数时,可以创建新连接。
  3. 连接超时设置:在获取连接时记录开始时间,若获取连接时间超过设定的超时时间,则抛出异常。
  4. 连接复用:获取连接时优先从空闲连接列表中获取,使用完毕后将连接放回空闲列表。
  5. 连接状态监控:记录每个连接的使用次数、最后使用时间等信息,方便监控和调试。

关键代码实现

import sqlite3
import time


class SQLiteConnectionPool:
    def __init__(self, db_path, max_connections=5, timeout=10):
        self.db_path = db_path
        self.max_connections = max_connections
        self.timeout = timeout
        self.connections = []
        self.in_use_connections = 0
        self.connection_stats = {}

    def get_connection(self):
        start_time = time.time()
        while True:
            if self.connections:
                conn = self.connections.pop(0)
                self.in_use_connections += 1
                return conn
            elif self.in_use_connections < self.max_connections:
                conn = sqlite3.connect(self.db_path)
                self.in_use_connections += 1
                self.connection_stats[id(conn)] = {
                    'usage_count': 1,
                    'last_used': time.time()
                }
                return conn
            if time.time() - start_time > self.timeout:
                raise TimeoutError("获取连接超时")
            time.sleep(0.1)

    def return_connection(self, conn):
        if id(conn) not in self.connection_stats:
            self.connection_stats[id(conn)] = {
                'usage_count': 1,
                'last_used': time.time()
            }
        else:
            self.connection_stats[id(conn)]['usage_count'] += 1
            self.connection_stats[id(conn)]['last_used'] = time.time()
        if self.in_use_connections > 0:
            self.in_use_connections -= 1
            self.connections.append(conn)

    def increase_max_connections(self, num):
        self.max_connections += num

    def decrease_max_connections(self, num):
        if self.max_connections - num < self.in_use_connections:
            raise ValueError("不能减少到当前使用连接数以下")
        self.max_connections -= num
        while len(self.connections) > self.max_connections - self.in_use_connections:
            conn = self.connections.pop()
            conn.close()

    def get_connection_stats(self):
        return self.connection_stats

使用示例

# 创建连接池实例
pool = SQLiteConnectionPool('test.db', max_connections=10, timeout=5)
try:
    # 获取连接
    conn = pool.get_connection()
    cursor = conn.cursor()
    cursor.execute('SELECT * FROM your_table')
    rows = cursor.fetchall()
    # 使用完后返回连接
    pool.return_connection(conn)
except TimeoutError as e:
    print(e)
except sqlite3.Error as e:
    print(f"SQLite error: {e}")
finally:
    # 打印连接状态统计信息
    stats = pool.get_connection_stats()
    print(stats)