RocketMQ本身特性实现幂等性
- 实现思路:
- RocketMQ 本身提供了幂等性保障机制。生产者发送消息时,会为每个消息生成唯一的
MessageId
或 TransactionId
(在事务消息场景下)。消费者在消费消息时,可以根据这些唯一标识来判断消息是否已经被消费过。
- 消费者端在处理消息前,先查询本地的消费记录(可以是数据库、缓存等存储),如果该消息已处理过,则直接返回成功,不再重复处理。
- 代码示例:
- 假设使用数据库来记录消费状态,创建一个
message_consume
表,包含 message_id
(消息唯一标识)和 consume_status
(消费状态,例如 0:未消费,1:已消费)字段。
- 消费者代码如下:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
public class YourConsumer implements RocketMQListener<String> {
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public void onMessage(String message) {
String messageId = "get_message_id_from_message";// 实际需要从消息中获取MessageId
int count = jdbcTemplate.queryForObject("SELECT COUNT(*) FROM message_consume WHERE message_id =?", Integer.class, messageId);
if (count > 0) {
// 已消费过,直接返回
return;
}
try {
// 处理消息业务逻辑
System.out.println("处理消息: " + message);
// 记录消费状态
jdbcTemplate.update("INSERT INTO message_consume (message_id, consume_status) VALUES (?, 1)", messageId);
} catch (Exception e) {
// 处理异常
e.printStackTrace();
}
}
}
Spring Cloud应用层面实现幂等性
- 实现思路:
- 在 Spring Cloud 应用中,可以利用分布式缓存(如 Redis)来实现幂等性。当接收到消息时,先尝试将消息的唯一标识存入 Redis,如果存入成功,说明该消息是首次处理,执行消息处理逻辑;如果存入失败(说明该消息已处理过,因为 Redis 的
SETNX
操作具有原子性,同一 key 无法重复设置成功),则直接返回成功。
- 代码示例:
- 首先,引入 Redis 依赖(假设使用 Spring Boot Starter Redis):
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
public class YourConsumerWithRedis implements RocketMQListener<String> {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Override
public void onMessage(String message) {
String messageId = "get_message_id_from_message";// 实际需要从消息中获取MessageId
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent("message:" + messageId, "processed");
if (result!= null && result) {
// 首次处理,执行消息处理逻辑
System.out.println("处理消息: " + message);
} else {
// 已处理过,直接返回
return;
}
}
}