MST

星途 面试题库

面试题:Python多线程与数据库操作优化的资源调度

在一个高并发的Python应用中,使用多线程频繁地进行数据库读写操作。现有多个线程竞争数据库资源,同时还有其他I/O任务(如文件读写)也在争夺系统资源。请设计一种资源调度策略,以确保数据库操作的高效性和稳定性,并且说明如何通过Python代码实现该调度策略,包括对多线程、数据库连接和其他I/O任务的协调管理。
20.1万 热度难度
编程语言Python

知识考点

AI 面试

面试题答案

一键面试

资源调度策略设计

  1. 数据库连接池:使用数据库连接池来管理数据库连接,避免频繁创建和销毁连接。这样可以减少连接开销,提高数据库操作的效率。例如,对于MySQL数据库,可以使用DBUtils库来实现连接池。
  2. 线程队列:创建一个线程安全的队列(如queue.Queue),将数据库操作任务放入队列中。然后,启动固定数量的工作线程从队列中取出任务并执行,这样可以控制同时进行数据库操作的线程数量,避免过多线程竞争数据库资源。
  3. I/O任务分离:将数据库操作和其他I/O任务(如文件读写)分离开来。为文件读写等其他I/O任务创建单独的线程池,通过类似的队列机制来管理任务。这样可以避免不同类型的I/O任务相互干扰。
  4. 信号量控制:使用信号量(threading.Semaphore)来限制同时访问数据库的线程数量。例如,如果数据库服务器能承受的最大并发连接数为10,就创建一个初始值为10的信号量。每个线程在进行数据库操作前先获取信号量,操作完成后释放信号量。

Python代码实现

  1. 数据库连接池示例(以MySQL和DBUtils为例)
from dbutils.pooled_db import PooledDB
import mysql.connector

# 创建数据库连接池
pool = PooledDB(
    creator=mysql.connector,
    host='localhost',
    user='your_user',
    password='your_password',
    database='your_database',
    autocommit=True,
    maxconnections=10
)
  1. 线程队列与数据库操作示例
import threading
import queue

# 创建任务队列
db_task_queue = queue.Queue()

def db_worker():
    while True:
        task = db_task_queue.get()
        if task is None:
            break
        try:
            # 从连接池获取连接
            conn = pool.connection()
            cursor = conn.cursor()
            # 执行数据库操作任务
            cursor.execute(task)
            result = cursor.fetchall()
            print(f"Database operation result: {result}")
            cursor.close()
            conn.close()
        except Exception as e:
            print(f"Database operation error: {e}")
        finally:
            db_task_queue.task_done()

# 启动工作线程
num_db_workers = 5
db_workers = []
for _ in range(num_db_workers):
    worker = threading.Thread(target=db_worker)
    worker.start()
    db_workers.append(worker)

# 示例:添加数据库操作任务到队列
db_task_queue.put("SELECT * FROM your_table")
  1. I/O任务分离示例(以文件读写为例)
import queue
import threading

# 创建文件读写任务队列
file_task_queue = queue.Queue()

def file_worker():
    while True:
        task = file_task_queue.get()
        if task is None:
            break
        try:
            with open(task['file_path'], task['mode']) as f:
                if task['mode'] == 'r':
                    content = f.read()
                    print(f"File read result: {content}")
                else:
                    f.write(task['content'])
                    print(f"File write completed.")
        except Exception as e:
            print(f"File operation error: {e}")
        finally:
            file_task_queue.task_done()

# 启动文件读写工作线程
num_file_workers = 3
file_workers = []
for _ in range(num_file_workers):
    worker = threading.Thread(target=file_worker)
    worker.start()
    file_workers.append(worker)

# 示例:添加文件读写任务到队列
file_task_queue.put({
    'file_path': 'test.txt',
  'mode': 'r'
})
  1. 信号量控制示例
import threading

# 创建信号量
db_semaphore = threading.Semaphore(10)

def db_operation():
    with db_semaphore:
        # 进行数据库操作
        conn = pool.connection()
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM your_table")
        result = cursor.fetchall()
        print(f"Database operation result: {result}")
        cursor.close()
        conn.close()

在上述代码实现中,通过数据库连接池管理数据库连接,线程队列协调任务执行,信号量控制数据库访问并发数,并将不同类型的I/O任务分离开来,以此确保数据库操作的高效性和稳定性,同时实现对多线程、数据库连接和其他I/O任务的协调管理。最后,在任务处理完成后,记得停止工作线程并清理资源,例如向任务队列中放入None值让工作线程退出,关闭连接池等。