面试题答案
一键面试import queue
import threading
import time
def producer(q):
num = 0
while True:
try:
q.put(num, block=True, timeout=1)
print(f"生产者放入: {num}")
num += 1
time.sleep(1)
except queue.Full:
print("队列已满,等待队列有空余位置...")
def consumer(q):
while True:
try:
item = q.get(block=True, timeout=1)
print(f"消费者取出: {item}")
q.task_done()
except queue.Empty:
print("队列为空,等待新数据...")
if __name__ == "__main__":
q = queue.Queue(maxsize=5)
producer_thread = threading.Thread(target=producer, args=(q,))
consumer_thread = threading.Thread(target=consumer, args=(q,))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
上述代码使用queue
模块实现了一个简单的生产者 - 消费者模型:
- 生产者函数
producer
:- 不断生成数字并尝试放入队列
q
,每次放入后等待1秒。 - 使用
try - except
捕获queue.Full
异常,当队列满时打印提示信息并等待。
- 不断生成数字并尝试放入队列
- 消费者函数
consumer
:- 不断从队列
q
中取出数据并打印,使用try - except
捕获queue.Empty
异常,当队列为空时打印提示信息并等待。 - 使用
q.task_done()
通知队列任务已完成。
- 不断从队列
- 主程序:
- 创建一个最大容量为5的队列
q
。 - 分别创建生产者线程和消费者线程并启动,最后等待两个线程结束。
- 创建一个最大容量为5的队列