MST

星途 面试题库

面试题:消息队列之RocketMQ消息生产者专家难度题

在分布式系统中,RocketMQ消息生产者遇到网络抖动导致部分消息发送失败,如何设计一个可靠的消息重发机制,确保消息最终一致性且避免重复消费?请给出详细的设计方案及代码示例(可选择熟悉的后端语言)。
13.5万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

设计方案

  1. 消息发送记录:在生产者端,维护一个消息发送记录表,记录每条消息的唯一标识(如消息ID)、发送状态(成功/失败)、发送时间等信息。这个表可以存储在关系型数据库(如MySQL)或分布式存储(如Redis)中。
  2. 定时任务检查:利用定时任务(如Spring Task、Quartz等),定期检查消息发送记录表中发送失败的消息。检查周期可以根据实际情况调整,比如每隔1分钟检查一次。
  3. 幂等性处理
    • 业务层面:在消费者端,设计业务逻辑时确保对于相同消息的多次处理结果是一致的。例如,在更新数据库操作中,使用唯一索引避免重复插入相同数据。
    • 消息层面:RocketMQ提供了消息幂等性支持,通过消息的唯一标识(Message ID)来判断是否为重复消息。消费者在处理消息前,可以先根据消息ID查询本地是否已经处理过该消息。
  4. 重发策略
    • 固定次数重发:设定一个最大重发次数,如3次。每次重发间隔可以适当延长,例如第一次重发间隔10秒,第二次间隔30秒,第三次间隔1分钟。
    • 动态重发:根据网络抖动的情况动态调整重发策略。例如,可以通过监控网络状况,当网络抖动较严重时,增加重发间隔和最大重发次数。

代码示例(Java + RocketMQ + MySQL + Spring Boot)

  1. 引入依赖
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-task</artifactId>
    </dependency>
</dependencies>
  1. 配置文件(application.properties)
spring.datasource.url=jdbc:mysql://localhost:3306/your_database
spring.datasource.username=root
spring.datasource.password=password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

rocketmq.name-server=localhost:9876
rocketmq.producer.group=test-group
  1. 消息发送记录表实体类(MessageSendRecord.java)
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;

@Entity
public class MessageSendRecord {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String messageId;
    private String topic;
    private String status;
    private long sendTime;

    // 省略getter和setter方法
}
  1. 消息发送记录仓储接口(MessageSendRecordRepository.java)
import org.springframework.data.jpa.repository.JpaRepository;

public interface MessageSendRecordRepository extends JpaRepository<MessageSendRecord, Long> {
    MessageSendRecord findByMessageId(String messageId);
}
  1. RocketMQ生产者配置(RocketMQConfig.java)
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RocketMQConfig {
    @Bean
    public RocketMQTemplate rocketMQTemplate() {
        return new RocketMQTemplate();
    }
}
  1. 消息生产者服务(MessageProducerService.java)
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MessageProducerService {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendMessage(String topic, String message) {
        try {
            rocketMQTemplate.syncSend(topic, message);
            // 发送成功记录到数据库
        } catch (Exception e) {
            // 发送失败记录到数据库
        }
    }
}
  1. 定时任务重发消息(MessageResendTask.java)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class MessageResendTask {
    @Autowired
    private MessageSendRecordRepository messageSendRecordRepository;
    @Autowired
    private MessageProducerService messageProducerService;

    @Scheduled(cron = "0 */1 * * * *") // 每分钟执行一次
    public void resendFailedMessages() {
        // 查询发送失败的消息
        messageSendRecordRepository.findByStatus("FAILED").forEach(record -> {
            try {
                messageProducerService.sendMessage(record.getTopic(), "");
                // 更新发送状态为成功
            } catch (Exception e) {
                // 增加重发次数,超过最大重发次数则标记为永久失败
            }
        });
    }
}
  1. 消费者端幂等性处理(MessageConsumer.java)
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "test-group")
public class MessageConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        // 根据消息ID检查是否已处理过
        // 处理业务逻辑
    }
}

通过以上设计方案和代码示例,可以实现一个可靠的消息重发机制,确保消息最终一致性且避免重复消费。