设计思路
- 连接池管理:使用一个列表来存储数据库连接对象,同时记录当前已使用的连接数量和最大连接数量。
- 动态调整连接数量:提供方法来增加或减少最大连接数,当连接不足且未达到最大连接数时,可以创建新连接。
- 连接超时设置:在获取连接时记录开始时间,若获取连接时间超过设定的超时时间,则抛出异常。
- 连接复用:获取连接时优先从空闲连接列表中获取,使用完毕后将连接放回空闲列表。
- 连接状态监控:记录每个连接的使用次数、最后使用时间等信息,方便监控和调试。
关键代码实现
import sqlite3
import time
class SQLiteConnectionPool:
def __init__(self, db_path, max_connections=5, timeout=10):
self.db_path = db_path
self.max_connections = max_connections
self.timeout = timeout
self.connections = []
self.in_use_connections = 0
self.connection_stats = {}
def get_connection(self):
start_time = time.time()
while True:
if self.connections:
conn = self.connections.pop(0)
self.in_use_connections += 1
return conn
elif self.in_use_connections < self.max_connections:
conn = sqlite3.connect(self.db_path)
self.in_use_connections += 1
self.connection_stats[id(conn)] = {
'usage_count': 1,
'last_used': time.time()
}
return conn
if time.time() - start_time > self.timeout:
raise TimeoutError("获取连接超时")
time.sleep(0.1)
def return_connection(self, conn):
if id(conn) not in self.connection_stats:
self.connection_stats[id(conn)] = {
'usage_count': 1,
'last_used': time.time()
}
else:
self.connection_stats[id(conn)]['usage_count'] += 1
self.connection_stats[id(conn)]['last_used'] = time.time()
if self.in_use_connections > 0:
self.in_use_connections -= 1
self.connections.append(conn)
def increase_max_connections(self, num):
self.max_connections += num
def decrease_max_connections(self, num):
if self.max_connections - num < self.in_use_connections:
raise ValueError("不能减少到当前使用连接数以下")
self.max_connections -= num
while len(self.connections) > self.max_connections - self.in_use_connections:
conn = self.connections.pop()
conn.close()
def get_connection_stats(self):
return self.connection_stats
使用示例
# 创建连接池实例
pool = SQLiteConnectionPool('test.db', max_connections=10, timeout=5)
try:
# 获取连接
conn = pool.get_connection()
cursor = conn.cursor()
cursor.execute('SELECT * FROM your_table')
rows = cursor.fetchall()
# 使用完后返回连接
pool.return_connection(conn)
except TimeoutError as e:
print(e)
except sqlite3.Error as e:
print(f"SQLite error: {e}")
finally:
# 打印连接状态统计信息
stats = pool.get_connection_stats()
print(stats)