实现思路
- 任务队列:使用
queue.Queue
来存储待处理的I/O任务。这个队列是线程安全的,能保证多线程环境下任务的正确添加和获取。
- 线程管理:创建固定数量的线程,每个线程从任务队列中获取任务并执行。
- 线程生命周期:线程启动后,不断从任务队列获取任务,若队列为空且有结束信号,则线程退出。
- 资源共享:对于需要共享的资源,如数据库连接等,采用连接池的方式管理,确保线程安全地使用资源。
关键代码片段
import threading
import queue
def worker(task_queue):
while True:
try:
task = task_queue.get(timeout=1)
if task is None:
break
# 执行I/O任务
task()
except queue.Empty:
continue
finally:
task_queue.task_done()
def main():
task_queue = queue.Queue()
num_threads = 5
threads = []
# 创建并启动线程
for _ in range(num_threads):
t = threading.Thread(target=worker, args=(task_queue,))
t.start()
threads.append(t)
# 添加任务到队列
for _ in range(10):
def io_task():
# 模拟I/O操作
pass
task_queue.put(io_task)
# 等待所有任务完成
task_queue.join()
# 发送结束信号给每个线程
for _ in range(num_threads):
task_queue.put(None)
# 等待所有线程结束
for t in threads:
t.join()
if __name__ == "__main__":
main()
说明
- 线程生命周期管理:
- 线程启动后进入
worker
函数的循环,task_queue.get(timeout=1)
从任务队列获取任务,设置超时避免线程在队列为空时一直阻塞。
- 当获取到
None
任务时,线程跳出循环,结束生命周期。
task_queue.join()
等待所有任务完成,task_queue.task_done()
用于标记任务完成。
- 任务队列:
queue.Queue
是线程安全的,put
方法添加任务,get
方法获取任务。
- 线程间资源共享:
- 代码中虽未体现具体的资源共享,但对于如数据库连接等共享资源,可以在主线程创建连接池,在
worker
函数中从连接池获取连接执行I/O任务,使用完毕后归还连接到连接池,以此保证资源的线程安全使用。