设计思路
- 任务完成通知:当节点完成特定任务后,向RabbitMQ发送消息通知其他节点。
- 网络延迟处理:设置合理的消息发送和接收超时机制,并且采用重试策略。例如,使用
tenacity
库来实现重试逻辑。
- 消息丢失处理:利用RabbitMQ的持久化特性,确保消息不会因为服务器重启等原因丢失。同时,使用消息确认机制(ACK),确保消息被正确接收。
核心代码框架
- 发送端代码(完成任务后发送通知)
import pika
from tenacity import retry, stop_after_attempt, wait_fixed
class TaskSender:
def __init__(self, host='localhost'):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host))
self.channel = self.connection.channel()
self.channel.queue_declare(queue='task_notification', durable=True)
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def send_task_completion(self):
try:
self.channel.basic_publish(
exchange='',
routing_key='task_notification',
body='Task completed',
properties=pika.BasicProperties(
delivery_mode=2, # 使消息持久化
)
)
print("Task completion message sent.")
except pika.exceptions.AMQPError as e:
print(f"Failed to send message: {e}")
raise
def close(self):
self.connection.close()
if __name__ == "__main__":
sender = TaskSender()
sender.send_task_completion()
sender.close()
- 接收端代码(接收通知并进行后续操作)
import pika
def callback(ch, method, properties, body):
print(f"Received message: {body.decode()}")
# 这里添加接收到消息后的后续操作代码
ch.basic_ack(delivery_tag=method.delivery_tag)
def receive_task_notification():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_notification', durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_notification', on_message_callback=callback)
print('Waiting for messages...')
channel.start_consuming()
if __name__ == "__main__":
receive_task_notification()
解释
- 发送端:
- 使用
pika
库连接RabbitMQ。
TaskSender
类封装了发送逻辑,send_task_completion
方法负责发送任务完成消息,使用@retry
装饰器实现重试。
durable=True
确保队列持久化,delivery_mode=2
确保消息持久化。
- 接收端:
- 同样使用
pika
库连接RabbitMQ。
callback
函数处理接收到的消息,basic_ack
方法确保消息被正确确认,防止消息丢失。
basic_qos(prefetch_count=1)
设置每次只处理一条消息,避免接收端过载。