MST

星途 面试题库

面试题:消息队列RocketMQ与Spring Cloud集成下的消息幂等性处理

在RocketMQ与Spring Cloud集成的场景下,由于网络等原因可能导致消息重复消费。请阐述如何保证消息的幂等性,从RocketMQ本身特性以及Spring Cloud应用层面分别说明实现思路和具体代码示例(以Java为例)。
19.7万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

RocketMQ本身特性实现幂等性

  1. 实现思路
    • RocketMQ 本身提供了幂等性保障机制。生产者发送消息时,会为每个消息生成唯一的 MessageIdTransactionId(在事务消息场景下)。消费者在消费消息时,可以根据这些唯一标识来判断消息是否已经被消费过。
    • 消费者端在处理消息前,先查询本地的消费记录(可以是数据库、缓存等存储),如果该消息已处理过,则直接返回成功,不再重复处理。
  2. 代码示例
    • 假设使用数据库来记录消费状态,创建一个 message_consume 表,包含 message_id(消息唯一标识)和 consume_status(消费状态,例如 0:未消费,1:已消费)字段。
    • 消费者代码如下:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
public class YourConsumer implements RocketMQListener<String> {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Override
    public void onMessage(String message) {
        String messageId = "get_message_id_from_message";// 实际需要从消息中获取MessageId
        int count = jdbcTemplate.queryForObject("SELECT COUNT(*) FROM message_consume WHERE message_id =?", Integer.class, messageId);
        if (count > 0) {
            // 已消费过,直接返回
            return;
        }
        try {
            // 处理消息业务逻辑
            System.out.println("处理消息: " + message);
            // 记录消费状态
            jdbcTemplate.update("INSERT INTO message_consume (message_id, consume_status) VALUES (?, 1)", messageId);
        } catch (Exception e) {
            // 处理异常
            e.printStackTrace();
        }
    }
}

Spring Cloud应用层面实现幂等性

  1. 实现思路
    • 在 Spring Cloud 应用中,可以利用分布式缓存(如 Redis)来实现幂等性。当接收到消息时,先尝试将消息的唯一标识存入 Redis,如果存入成功,说明该消息是首次处理,执行消息处理逻辑;如果存入失败(说明该消息已处理过,因为 Redis 的 SETNX 操作具有原子性,同一 key 无法重复设置成功),则直接返回成功。
  2. 代码示例
    • 首先,引入 Redis 依赖(假设使用 Spring Boot Starter Redis):
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  • 消费者代码如下:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
public class YourConsumerWithRedis implements RocketMQListener<String> {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void onMessage(String message) {
        String messageId = "get_message_id_from_message";// 实际需要从消息中获取MessageId
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent("message:" + messageId, "processed");
        if (result!= null && result) {
            // 首次处理,执行消息处理逻辑
            System.out.println("处理消息: " + message);
        } else {
            // 已处理过,直接返回
            return;
        }
    }
}