MST

星途 面试题库

面试题:Python 生产者 - 消费者问题之分布式场景

在分布式环境中,使用 Python 结合消息队列如 RabbitMQ 实现生产者 - 消费者系统。生产者分布在多个不同的服务器上,消费者也可能有多个实例运行在不同服务器。请设计并实现一个可靠的架构,确保消息不丢失,处理消息的顺序性(如果有需要),同时考虑如何应对网络故障和节点崩溃等异常情况。
21.8万 热度难度
编程语言Python

知识考点

AI 面试

面试题答案

一键面试

1. 架构设计

  1. 生产者
    • 每个生产者服务器使用pika库连接到RabbitMQ服务器。
    • 生产者发送消息时,设置消息持久化,确保RabbitMQ服务器重启后消息不丢失。
    • 使用事务机制(channel.tx_select()channel.tx_commit()channel.tx_rollback())来保证消息发送的可靠性。如果消息发送失败,可以回滚事务。
  2. 消费者
    • 每个消费者实例使用pika库连接到RabbitMQ服务器。
    • 消费者设置为手动确认消息(auto_ack=False),确保在消息处理完成后才向RabbitMQ确认,避免消息丢失。
    • 如果需要保证消息顺序性,使用单队列单消费者模式。如果有多个消费者实例,可以通过设置队列的x - max - length - per - consumer参数,将队列消息均匀分配给每个消费者,并且每个消费者按顺序处理消息。
  3. RabbitMQ服务器
    • 使用镜像队列(ha - mode: all)来保证RabbitMQ服务器的高可用性,防止单个节点崩溃导致服务不可用。
    • 配置合适的磁盘配额和内存限制,避免因资源耗尽导致消息丢失。

2. 代码实现

生产者代码

import pika

# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq_server_host'))
channel = connection.channel()

# 声明队列,设置为持久化
channel.queue_declare(queue='my_queue', durable=True)

# 开启事务
channel.tx_select()
try:
    message = "Hello, RabbitMQ!"
    # 发送消息,设置为持久化
    channel.basic_publish(exchange='',
                          routing_key='my_queue',
                          body=message,
                          properties=pika.BasicProperties(delivery_mode=2))
    print(" [x] Sent %r" % message)
    channel.tx_commit()
except Exception as e:
    print(f" [x] Send message failed: {e}")
    channel.tx_rollback()
finally:
    connection.close()

消费者代码

import pika

# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq_server_host'))
channel = connection.channel()

# 声明队列,设置为持久化
channel.queue_declare(queue='my_queue', durable=True)

# 定义回调函数处理消息
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    # 模拟消息处理
    import time
    time.sleep(1)
    print(" [x] Done")
    # 手动确认消息
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 设置每次只接收一条消息,处理完成后再接收下一条
channel.basic_qos(prefetch_count=1)
# 启动消费者,设置为手动确认消息
channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

3. 异常处理

  1. 网络故障
    • pika库在网络故障时会抛出异常。生产者和消费者代码捕获异常后,可以尝试重新连接RabbitMQ服务器。例如,在生产者代码中,捕获异常后可以重新创建BlockingConnectionchannel,重新发送消息。
    • 消费者代码捕获异常后,重新连接并重新启动消费。
  2. 节点崩溃
    • 由于使用了镜像队列,当某个RabbitMQ节点崩溃时,其他节点可以继续提供服务。但是生产者和消费者可能需要重新连接到新的活动节点。可以使用RabbitMQ的集群拓扑发现机制(如pika库支持的自动重连功能)来处理这种情况。在连接字符串中可以指定多个RabbitMQ节点地址,pika库会尝试连接列表中的节点,直到成功连接。例如:pika.ConnectionParameters(host=['node1', 'node2', 'node3'])