1. 架构设计
- 生产者:
- 每个生产者服务器使用
pika
库连接到RabbitMQ服务器。
- 生产者发送消息时,设置消息持久化,确保RabbitMQ服务器重启后消息不丢失。
- 使用事务机制(
channel.tx_select()
,channel.tx_commit()
,channel.tx_rollback()
)来保证消息发送的可靠性。如果消息发送失败,可以回滚事务。
- 消费者:
- 每个消费者实例使用
pika
库连接到RabbitMQ服务器。
- 消费者设置为手动确认消息(
auto_ack=False
),确保在消息处理完成后才向RabbitMQ确认,避免消息丢失。
- 如果需要保证消息顺序性,使用单队列单消费者模式。如果有多个消费者实例,可以通过设置队列的
x - max - length - per - consumer
参数,将队列消息均匀分配给每个消费者,并且每个消费者按顺序处理消息。
- RabbitMQ服务器:
- 使用镜像队列(
ha - mode: all
)来保证RabbitMQ服务器的高可用性,防止单个节点崩溃导致服务不可用。
- 配置合适的磁盘配额和内存限制,避免因资源耗尽导致消息丢失。
2. 代码实现
生产者代码
import pika
# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq_server_host'))
channel = connection.channel()
# 声明队列,设置为持久化
channel.queue_declare(queue='my_queue', durable=True)
# 开启事务
channel.tx_select()
try:
message = "Hello, RabbitMQ!"
# 发送消息,设置为持久化
channel.basic_publish(exchange='',
routing_key='my_queue',
body=message,
properties=pika.BasicProperties(delivery_mode=2))
print(" [x] Sent %r" % message)
channel.tx_commit()
except Exception as e:
print(f" [x] Send message failed: {e}")
channel.tx_rollback()
finally:
connection.close()
消费者代码
import pika
# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq_server_host'))
channel = connection.channel()
# 声明队列,设置为持久化
channel.queue_declare(queue='my_queue', durable=True)
# 定义回调函数处理消息
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 模拟消息处理
import time
time.sleep(1)
print(" [x] Done")
# 手动确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)
# 设置每次只接收一条消息,处理完成后再接收下一条
channel.basic_qos(prefetch_count=1)
# 启动消费者,设置为手动确认消息
channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
3. 异常处理
- 网络故障:
pika
库在网络故障时会抛出异常。生产者和消费者代码捕获异常后,可以尝试重新连接RabbitMQ服务器。例如,在生产者代码中,捕获异常后可以重新创建BlockingConnection
和channel
,重新发送消息。
- 消费者代码捕获异常后,重新连接并重新启动消费。
- 节点崩溃:
- 由于使用了镜像队列,当某个RabbitMQ节点崩溃时,其他节点可以继续提供服务。但是生产者和消费者可能需要重新连接到新的活动节点。可以使用RabbitMQ的集群拓扑发现机制(如
pika
库支持的自动重连功能)来处理这种情况。在连接字符串中可以指定多个RabbitMQ节点地址,pika
库会尝试连接列表中的节点,直到成功连接。例如:pika.ConnectionParameters(host=['node1', 'node2', 'node3'])
。