面试题答案
一键面试设计思路
- 连接池管理:使用一个队列来管理数据库连接对象。初始化连接池时,创建一定数量的连接并放入队列。
- 异常检测:
- 连接超时:在获取连接时,设置获取连接的超时时间。若在规定时间内未能从队列获取连接,则抛出超时异常。
- 连接中断:在每次从连接池获取连接后,执行一个简单的数据库查询(如
SELECT 1
)来检测连接是否可用。若查询失败,捕获相应异常,标记该连接不可用。 - 死锁检测:数据库本身一般有死锁检测机制,Python 层面可以通过捕获数据库操作抛出的死锁相关异常(如
psycopg2
库中psycopg2.extensions.TransactionRollbackError
异常在某些情况下可能表示死锁)来检测。
- 恢复连接:
- 连接中断:当检测到连接中断后,将该连接从连接池中移除,关闭连接对象,并重新创建一个新的连接放入连接池。
- 死锁:捕获到死锁异常后,回滚当前事务(如果有事务正在进行),关闭当前连接(可选,具体看数据库驱动和业务需求),从连接池重新获取连接并尝试重新执行操作。
- 避免死锁:
- 事务顺序:确保在不同业务逻辑中,对数据库资源的访问顺序一致。例如,若涉及多个表的操作,在所有业务逻辑中都按照相同的表顺序进行操作。
- 锁粒度:尽量使用较低粒度的锁。如使用行级锁而非表级锁,减少锁冲突的范围。
- 事务超时:设置合理的事务超时时间,若事务执行时间过长,自动回滚事务,避免长时间持有锁。
关键代码示例(以 psycopg2
库连接 PostgreSQL 数据库为例)
import queue
import threading
import psycopg2
import time
class DatabaseConnectionPool:
def __init__(self, host, port, database, user, password, pool_size=5, timeout=10):
self.host = host
self.port = port
self.database = database
self.user = user
self.password = password
self.pool_size = pool_size
self.timeout = timeout
self.pool = queue.Queue(maxsize=pool_size)
self._init_pool()
def _init_pool(self):
for _ in range(self.pool_size):
connection = self._create_connection()
self.pool.put(connection)
def _create_connection(self):
try:
return psycopg2.connect(
host=self.host,
port=self.port,
database=self.database,
user=self.user,
password=self.password
)
except psycopg2.Error as e:
print(f"Failed to create connection: {e}")
return None
def get_connection(self):
try:
return self.pool.get(timeout=self.timeout)
except queue.Empty:
raise TimeoutError("Failed to get connection from pool within timeout.")
def return_connection(self, connection):
try:
# 简单检测连接是否可用
cursor = connection.cursor()
cursor.execute("SELECT 1")
cursor.close()
self.pool.put(connection)
except psycopg2.Error as e:
print(f"Connection seems broken, recreating: {e}")
self.pool.put(self._create_connection())
def example_usage():
pool = DatabaseConnectionPool(
host='localhost',
port=5432,
database='test_db',
user='test_user',
password='test_password'
)
connection = pool.get_connection()
try:
cursor = connection.cursor()
try:
cursor.execute("SELECT * FROM some_table")
rows = cursor.fetchall()
for row in rows:
print(row)
except psycopg2.Error as e:
if isinstance(e, psycopg2.extensions.TransactionRollbackError):
print("Possible deadlock detected, rolling back and retrying...")
connection.rollback()
# 这里可以选择关闭当前连接并重新获取
new_connection = pool.get_connection()
new_cursor = new_connection.cursor()
new_cursor.execute("SELECT * FROM some_table")
new_rows = new_cursor.fetchall()
for row in new_rows:
print(row)
new_cursor.close()
pool.return_connection(new_connection)
else:
print(f"Database operation error: {e}")
cursor.close()
finally:
pool.return_connection(connection)
if __name__ == "__main__":
example_usage()
上述代码实现了一个简单的 PostgreSQL 数据库连接池,包含获取连接、检测连接可用性、处理死锁异常等功能。在实际应用中,可根据具体数据库和业务需求进行进一步优化和扩展。