MST

星途 面试题库

面试题:Python 多线程下 Queue/queue 模块的复杂应用

假设有多个生产者线程,每个生产者线程生成一系列随机字母(a - z)放入队列。同时有多个消费者线程,每个消费者线程从队列中取出字母,并统计每个字母出现的次数,最后汇总到一个总的字典中并打印。要求处理线程安全问题,写出实现代码,并解释如何通过 Queue/queue 模块保证数据一致性和线程安全。
24.1万 热度难度
编程语言Python

知识考点

AI 面试

面试题答案

一键面试
import threading
import queue
import random
import string


def producer(queue_obj):
    while True:
        letter = random.choice(string.ascii_lowercase)
        queue_obj.put(letter)


def consumer(queue_obj, result_dict, lock):
    local_counter = {}
    while True:
        try:
            letter = queue_obj.get(timeout=1)
            local_counter[letter] = local_counter.get(letter, 0) + 1
            queue_obj.task_done()
        except queue.Empty:
            break
    with lock:
        for letter, count in local_counter.items():
            result_dict[letter] = result_dict.get(letter, 0) + count


if __name__ == "__main__":
    q = queue.Queue()
    result = {}
    lock = threading.Lock()
    num_producers = 3
    num_consumers = 2

    producer_threads = [threading.Thread(target=producer, args=(q,)) for _ in range(num_producers)]
    consumer_threads = [threading.Thread(target=consumer, args=(q, result, lock)) for _ in range(num_consumers)]

    for producer_thread in producer_threads:
        producer_thread.start()

    for consumer_thread in consumer_threads:
        consumer_thread.start()

    for producer_thread in producer_threads:
        producer_thread.join()

    q.join()
    for consumer_thread in consumer_threads:
        consumer_thread.join()

    print(result)

通过 Queue/queue 模块保证数据一致性和线程安全的解释

  1. 数据一致性
    • Queue 模块的 Queue 类实现了一个先进先出(FIFO)的数据结构。生产者将字母放入队列,消费者从队列中取出字母,这样就保证了字母处理的顺序,从而保证了数据一致性。
    • 消费者线程处理完一个字母后,调用 queue_obj.task_done() 方法,该方法用于通知队列任务已完成。而 q.join() 方法会阻塞主线程,直到队列中所有的任务都被完成,这确保了所有生产者生产的数据都被消费者处理,进一步保证了数据一致性。
  2. 线程安全
    • Queue 类内部使用锁机制来管理对队列的访问。当生产者调用 put 方法或消费者调用 get 方法时,Queue 会自动获取相应的锁,防止多个线程同时访问队列导致数据竞争。
    • 例如,当一个生产者线程调用 q.put(letter) 时,会自动获取锁,在操作完成后释放锁,这样其他线程就可以安全地访问队列。同样,消费者线程调用 q.get() 时也会获取锁,保证数据的线程安全访问。
    • 在更新共享的 result_dict 时,使用了 threading.Lock() 来确保同一时间只有一个线程能够更新字典,避免了数据竞争问题,进一步保证了线程安全。