- 关键代码片段
import threading
import queue
import time
# 工作线程函数
def worker(q):
while True:
try:
task = q.get(timeout=1)
print(f"线程 {threading.current_thread().name} 开始处理任务: {task}")
time.sleep(1) # 模拟任务处理时间
print(f"线程 {threading.current_thread().name} 完成任务: {task}")
q.task_done()
except queue.Empty:
break
if __name__ == "__main__":
task_queue = queue.Queue()
# 创建并启动工作线程
num_workers = 3
threads = []
for i in range(num_workers):
t = threading.Thread(target=worker, args=(task_queue,))
t.start()
threads.append(t)
# 主线程生成任务并放入队列
for i in range(10):
task = f"任务 {i}"
task_queue.put(task)
print(f"主线程放入任务: {task}")
# 等待所有任务完成
task_queue.join()
# 通知工作线程结束
for _ in range(num_workers):
task_queue.put(None)
# 等待所有工作线程结束
for t in threads:
t.join()
- 原理解释
Queue
模块:Queue
是线程安全的队列,在多线程编程中用于安全地在不同线程间传递数据。Queue
内部使用锁机制来确保在多线程环境下数据的一致性和线程安全。
- 工作线程部分:
while True:
循环确保线程持续运行,不断从队列中获取任务。
try: task = q.get(timeout=1)
尝试从队列 q
中获取任务,设置 timeout
为1秒,如果1秒内队列为空则抛出 queue.Empty
异常。
- 获取到任务后,线程开始处理任务,这里使用
time.sleep(1)
模拟任务处理时间。
- 任务处理完成后,调用
q.task_done()
通知队列该任务已完成,这一步很重要,Queue
使用这个信号来判断所有任务是否都已被处理。
except queue.Empty:
捕获 queue.Empty
异常,当队列为空且设置了 timeout
时会触发该异常,线程在捕获到异常后跳出循环结束运行。
- 主线程部分:
- 创建一个
Queue
对象 task_queue
用于存储任务。
- 创建并启动多个工作线程,每个线程的目标函数是
worker
,并将 task_queue
作为参数传递给工作线程。
- 主线程通过
task_queue.put(task)
将任务放入队列中。
task_queue.join()
会阻塞主线程,直到队列中的所有任务都被处理完成,即所有 task_done()
都被调用。
- 为了通知工作线程结束,主线程向队列中放入
None
作为结束信号,数量与工作线程数相同。每个工作线程在获取到 None
时会结束循环。
- 最后通过
t.join()
等待所有工作线程实际结束运行。