MST
星途 面试题库

面试题:Python if语句在分布式并发编程同步控制的优化

在一个基于Python的分布式系统中,不同节点之间需要进行并发任务的同步控制。假设使用`if`语句结合消息队列(如RabbitMQ)来实现同步逻辑,当某个节点完成特定任务后,通过消息通知其他节点进行后续操作。请详细说明如何设计高效的同步控制机制,包括如何处理网络延迟、消息丢失等问题,并且给出核心代码框架。
47.9万 热度难度
编程语言Python

知识考点

AI 面试

面试题答案

一键面试

设计思路

  1. 任务完成通知:当节点完成特定任务后,向RabbitMQ发送消息通知其他节点。
  2. 网络延迟处理:设置合理的消息发送和接收超时机制,并且采用重试策略。例如,使用tenacity库来实现重试逻辑。
  3. 消息丢失处理:利用RabbitMQ的持久化特性,确保消息不会因为服务器重启等原因丢失。同时,使用消息确认机制(ACK),确保消息被正确接收。

核心代码框架

  1. 发送端代码(完成任务后发送通知)
import pika
from tenacity import retry, stop_after_attempt, wait_fixed


class TaskSender:
    def __init__(self, host='localhost'):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue='task_notification', durable=True)

    @retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
    def send_task_completion(self):
        try:
            self.channel.basic_publish(
                exchange='',
                routing_key='task_notification',
                body='Task completed',
                properties=pika.BasicProperties(
                    delivery_mode=2,  # 使消息持久化
                )
            )
            print("Task completion message sent.")
        except pika.exceptions.AMQPError as e:
            print(f"Failed to send message: {e}")
            raise

    def close(self):
        self.connection.close()


if __name__ == "__main__":
    sender = TaskSender()
    sender.send_task_completion()
    sender.close()
  1. 接收端代码(接收通知并进行后续操作)
import pika


def callback(ch, method, properties, body):
    print(f"Received message: {body.decode()}")
    # 这里添加接收到消息后的后续操作代码
    ch.basic_ack(delivery_tag=method.delivery_tag)


def receive_task_notification():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='task_notification', durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(queue='task_notification', on_message_callback=callback)

    print('Waiting for messages...')
    channel.start_consuming()


if __name__ == "__main__":
    receive_task_notification()

解释

  1. 发送端
    • 使用pika库连接RabbitMQ。
    • TaskSender类封装了发送逻辑,send_task_completion方法负责发送任务完成消息,使用@retry装饰器实现重试。
    • durable=True确保队列持久化,delivery_mode=2确保消息持久化。
  2. 接收端
    • 同样使用pika库连接RabbitMQ。
    • callback函数处理接收到的消息,basic_ack方法确保消息被正确确认,防止消息丢失。
    • basic_qos(prefetch_count=1)设置每次只处理一条消息,避免接收端过载。