import threading
import queue
class ProducerConsumerModel:
def __init__(self, max_size):
self.queue = queue.Queue(maxsize=max_size)
self.condition = threading.Condition()
def producer(self, item):
with self.condition:
while self.queue.full():
print("Queue is full, producer waiting...")
self.condition.wait()
self.queue.put(item)
print(f"Produced: {item}")
self.condition.notify()
def consumer(self):
with self.condition:
while self.queue.empty():
print("Queue is empty, consumer waiting...")
self.condition.wait()
item = self.queue.get()
print(f"Consumed: {item}")
self.condition.notify()
# 示例使用
max_size = 5
model = ProducerConsumerModel(max_size)
producer_thread1 = threading.Thread(target=model.producer, args=(1,))
producer_thread2 = threading.Thread(target=model.producer, args=(2,))
consumer_thread1 = threading.Thread(target=model.consumer)
consumer_thread2 = threading.Thread(target=model.consumer)
producer_thread1.start()
producer_thread2.start()
consumer_thread1.start()
consumer_thread2.start()
producer_thread1.join()
producer_thread2.join()
consumer_thread1.join()
consumer_thread2.join()
Condition的wait
、notify
和notify_all
方法在这个场景中的作用和调用时机:
wait
方法:
- 作用:使当前线程等待,同时释放相关的锁(在
with
语句块中获取的锁),这样其他线程就可以获取锁并修改共享资源。当线程被唤醒时,会重新获取锁并继续执行wait
之后的代码。
- 调用时机:在生产者线程中,当共享队列已满(
self.queue.full()
)时,生产者调用self.condition.wait()
,等待队列有空间。在消费者线程中,当共享队列已空(self.queue.empty()
)时,消费者调用self.condition.wait()
,等待队列中有数据。
notify
方法:
- 作用:唤醒一个等待在
Condition
对象上的线程。如果有多个线程在等待,选择哪一个线程被唤醒是不确定的。
- 调用时机:在生产者往队列中添加数据后(
self.queue.put(item)
),调用self.condition.notify()
,通知可能在等待队列有数据的消费者线程。在消费者从队列中取出数据后(self.queue.get()
),调用self.condition.notify()
,通知可能在等待队列有空间的生产者线程。
notify_all
方法:
- 作用:唤醒所有等待在
Condition
对象上的线程。
- 调用时机:在这个场景中,如果希望同时唤醒所有等待的生产者和消费者线程,可以在某些关键操作后调用
self.condition.notify_all()
。例如,在程序结束前要确保所有等待的线程都能收到通知并退出。但一般情况下,notify
方法已经能满足生产者 - 消费者模型的基本需求,因为每次操作通常只需要唤醒一个等待的线程。