import threading
import queue
import random
import string
def producer(queue_obj):
while True:
letter = random.choice(string.ascii_lowercase)
queue_obj.put(letter)
def consumer(queue_obj, result_dict, lock):
local_counter = {}
while True:
try:
letter = queue_obj.get(timeout=1)
local_counter[letter] = local_counter.get(letter, 0) + 1
queue_obj.task_done()
except queue.Empty:
break
with lock:
for letter, count in local_counter.items():
result_dict[letter] = result_dict.get(letter, 0) + count
if __name__ == "__main__":
q = queue.Queue()
result = {}
lock = threading.Lock()
num_producers = 3
num_consumers = 2
producer_threads = [threading.Thread(target=producer, args=(q,)) for _ in range(num_producers)]
consumer_threads = [threading.Thread(target=consumer, args=(q, result, lock)) for _ in range(num_consumers)]
for producer_thread in producer_threads:
producer_thread.start()
for consumer_thread in consumer_threads:
consumer_thread.start()
for producer_thread in producer_threads:
producer_thread.join()
q.join()
for consumer_thread in consumer_threads:
consumer_thread.join()
print(result)
通过 Queue/queue
模块保证数据一致性和线程安全的解释
- 数据一致性:
Queue
模块的 Queue
类实现了一个先进先出(FIFO)的数据结构。生产者将字母放入队列,消费者从队列中取出字母,这样就保证了字母处理的顺序,从而保证了数据一致性。
- 消费者线程处理完一个字母后,调用
queue_obj.task_done()
方法,该方法用于通知队列任务已完成。而 q.join()
方法会阻塞主线程,直到队列中所有的任务都被完成,这确保了所有生产者生产的数据都被消费者处理,进一步保证了数据一致性。
- 线程安全:
Queue
类内部使用锁机制来管理对队列的访问。当生产者调用 put
方法或消费者调用 get
方法时,Queue
会自动获取相应的锁,防止多个线程同时访问队列导致数据竞争。
- 例如,当一个生产者线程调用
q.put(letter)
时,会自动获取锁,在操作完成后释放锁,这样其他线程就可以安全地访问队列。同样,消费者线程调用 q.get()
时也会获取锁,保证数据的线程安全访问。
- 在更新共享的
result_dict
时,使用了 threading.Lock()
来确保同一时间只有一个线程能够更新字典,避免了数据竞争问题,进一步保证了线程安全。