MST

星途 面试题库

面试题:消息队列回调失败重试策略之常见场景

在常见的后端开发场景中,使用消息队列时,列举至少3种可能导致回调失败的场景,并简单阐述针对这些场景,如何设计基本的重试策略?
21.6万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

可能导致回调失败的场景

  1. 网络波动:网络不稳定可能导致消息发送或接收失败,例如网络延迟、丢包等情况,使得回调无法正常进行。
  2. 服务端过载:当消息队列服务端或目标处理服务负载过高时,可能无法及时处理回调请求,导致失败。
  3. 消息格式错误:如果消息在传输过程中出现格式损坏,接收方无法正确解析消息,会导致回调失败。
  4. 目标服务故障:目标处理回调的服务出现故障,如崩溃、未启动等,无法响应回调请求。

重试策略设计

  1. 固定间隔重试
    • 策略:每次回调失败后,等待固定的时间间隔(如1秒、5秒等)后重试。
    • 适用场景:适用于因网络瞬时波动等临时性问题导致的回调失败。
    • 示例代码(Python + RabbitMQ)
import pika
import time

retry_count = 0
max_retries = 3
retry_delay = 1

while True:
    try:
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        channel = connection.channel()
        # 发送消息等操作
        break
    except pika.exceptions.AMQPConnectionError as e:
        if retry_count < max_retries:
            print(f"连接失败,重试 {retry_count + 1} 次。错误: {e}")
            time.sleep(retry_delay)
            retry_count += 1
        else:
            print(f"达到最大重试次数,放弃。错误: {e}")
            break
  1. 指数退避重试
    • 策略:每次重试的间隔时间以指数形式增长(例如2的幂次方,1秒、2秒、4秒、8秒等)。这样可以避免在故障未恢复时过于频繁地重试,同时随着时间推移增加重试频率。
    • 适用场景:适用于服务端过载或目标服务临时故障等情况,可能需要一定时间恢复的场景。
    • 示例代码(Java + Kafka)
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerExample {
    private static final int MAX_RETRIES = 3;
    private static final int BASE_RETRY_DELAY = 1000; // 1秒

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");

        for (int i = 0; i < MAX_RETRIES; i++) {
            try {
                producer.send(record).get();
                System.out.println("消息发送成功");
                break;
            } catch (InterruptedException | ExecutionException e) {
                if (i < MAX_RETRIES - 1) {
                    int delay = (int) (BASE_RETRY_DELAY * Math.pow(2, i));
                    System.out.println("发送失败,重试 " + (i + 1) + " 次。错误: " + e.getMessage() + ",延迟 " + delay + " 毫秒后重试");
                    try {
                        Thread.sleep(delay);
                    } catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                    }
                } else {
                    System.out.println("达到最大重试次数,放弃。错误: " + e.getMessage());
                }
            }
        }
        producer.close();
    }
}
  1. 随机间隔重试
    • 策略:每次重试的间隔时间在一定范围内随机取值(例如1 - 5秒之间随机)。这可以避免大量重试请求同时到达,减轻服务端压力。
    • 适用场景:适用于多个客户端同时重试可能导致服务端压力增大的场景。
    • 示例代码(Node.js + Redis)
const redis = require('redis');
const { promisify } = require('util');

const client = redis.createClient();
const setexAsync = promisify(client.setex).bind(client);

const maxRetries = 3;
const minRetryDelay = 1000; // 1秒
const maxRetryDelay = 5000; // 5秒

async function sendMessage() {
    let retryCount = 0;
    while (retryCount < maxRetries) {
        try {
            await setexAsync('message-key', 3600, 'message-value');
            console.log('消息发送成功');
            break;
        } catch (error) {
            const delay = Math.floor(Math.random() * (maxRetryDelay - minRetryDelay + 1)) + minRetryDelay;
            console.log(`发送失败,重试 ${retryCount + 1} 次。错误: ${error.message},延迟 ${delay} 毫秒后重试`);
            await new Promise(resolve => setTimeout(resolve, delay));
            retryCount++;
        }
    }
    if (retryCount === maxRetries) {
        console.log('达到最大重试次数,放弃。');
    }
    client.quit();
}

sendMessage();
  1. 结合熔断机制
    • 策略:如果连续多次重试失败(超过一定阈值,如5次),暂时停止重试一段时间(熔断时间,如1分钟),在熔断时间结束后,尝试进行少量试探性重试(如2次),如果成功则恢复正常重试流程,若仍然失败则再次熔断。
    • 适用场景:适用于目标服务出现严重故障,短时间内无法恢复的情况,避免无效重试消耗资源。
    • 示例代码(Go + NATS)
package main

import (
    "fmt"
    "github.com/nats-io/nats.go"
    "time"
)

const (
    maxRetries      = 5
    retryInterval   = 2 * time.Second
    circuitBreakerThreshold = 5
    circuitBreakerDuration = 1 * time.Minute
    probeRetries = 2
)

func main() {
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        fmt.Printf("连接失败: %v\n", err)
        return
    }
    defer nc.Close()

    var retryCount int
    var circuitBreakerOpen bool
    var circuitBreakerEnd time.Time

    for {
        if circuitBreakerOpen && time.Now().Before(circuitBreakerEnd) {
            time.Sleep(1 * time.Second)
            continue
        }

        err := nc.Publish("subject", []byte("message"))
        if err == nil {
            fmt.Println("消息发送成功")
            break
        }

        if circuitBreakerOpen {
            if retryCount < probeRetries {
                retryCount++
                time.Sleep(retryInterval)
                continue
            } else {
                circuitBreakerOpen = true
                circuitBreakerEnd = time.Now().Add(circuitBreakerDuration)
                retryCount = 0
                continue
            }
        }

        retryCount++
        if retryCount >= circuitBreakerThreshold {
            circuitBreakerOpen = true
            circuitBreakerEnd = time.Now().Add(circuitBreakerDuration)
            retryCount = 0
        } else {
            time.Sleep(retryInterval)
        }
    }
}