消息发送
- 确定延迟级别:RocketMQ 支持的延迟级别默认是 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h ,可根据需求选择合适的延迟级别,15分钟未支付取消订单可选择延迟级别为15分钟对应的级别,取消前5分钟提醒可选择延迟级别为10分钟对应的级别。
- 构建消息体:在下单成功后,创建两条延迟消息。一条用于15分钟后取消订单,消息体中包含订单ID等必要信息;另一条用于10分钟后发送提醒消息,同样包含订单ID等关键信息。
- 发送消息:使用 RocketMQ 的生产者 API 发送这两条延迟消息到对应的主题(例如,主题命名为
order_delay_topic
)。示例代码如下(以Java为例):
DefaultMQProducer producer = new DefaultMQProducer("group_name");
producer.setNamesrvAddr("namesrv_ip:port");
producer.start();
Message cancelOrderMessage = new Message("order_delay_topic", "cancel_order_tag", orderId.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置15分钟延迟级别
cancelOrderMessage.setDelayTimeLevel(15);
SendResult cancelSendResult = producer.send(cancelOrderMessage);
Message reminderMessage = new Message("order_delay_topic", "reminder_tag", orderId.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置10分钟延迟级别
reminderMessage.setDelayTimeLevel(10);
SendResult reminderSendResult = producer.send(reminderMessage);
producer.shutdown();
消息接收处理逻辑
- 消费者配置:创建 RocketMQ 的消费者,订阅
order_delay_topic
主题,并设置消费者组(例如consumer_group
)。
- 消息处理:
- 提醒消息处理:消费者接收到10分钟延迟的提醒消息后,解析出订单ID,根据订单ID查询订单状态,若订单未支付,则调用短信服务、邮件服务等发送提醒消息给用户。示例代码如下(以Java为例):
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("namesrv_ip:port");
consumer.subscribe("order_delay_topic", "reminder_tag");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String orderId = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
// 查询订单状态
if (!isOrderPaid(orderId)) {
// 发送提醒消息
sendReminderMessage(orderId);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
- **取消订单消息处理**:消费者接收到15分钟延迟的取消订单消息后,解析出订单ID,查询订单状态,若订单未支付,则调用订单取消接口,更新订单状态为取消,并进行相应的库存回滚等操作。示例代码如下(以Java为例):
consumer.subscribe("order_delay_topic", "cancel_order_tag");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String orderId = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
// 查询订单状态
if (!isOrderPaid(orderId)) {
// 取消订单
cancelOrder(orderId);
// 库存回滚
rollbackStock(orderId);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
保证可靠性和一致性
- 可靠性:
- 消息持久化:RocketMQ 默认将消息持久化到磁盘,确保消息在服务器重启等情况下不丢失。可通过配置
storePathCommitLog
等参数来指定消息存储路径和相关策略。
- 生产者重试:在消息发送时,生产者会根据配置的重试次数进行重试。若消息发送失败,会自动重试一定次数,以确保消息能够成功发送到 Broker。例如在上述生产者代码中,RocketMQ 会自动重试消息发送。
- 消费者重试:当消费者消费消息失败时,RocketMQ 会根据配置进行重试。默认情况下,消息会被重试16次。对于一些幂等性操作(如取消订单、更新订单状态),可以在消费逻辑中做好幂等性处理,避免重复操作导致数据不一致。例如在取消订单接口中,根据订单ID查询订单状态,若订单已经取消则不重复操作。
- 一致性:
- 分布式事务消息:下单操作和消息发送可以通过 RocketMQ 的分布式事务消息来保证一致性。在下单时,先发送半消息,下单成功后再提交消息。如果下单失败,则回滚半消息。示例代码如下(以Java为例):
TransactionMQProducer producer = new TransactionMQProducer("transaction_group");
producer.setNamesrvAddr("namesrv_ip:port");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 下单操作
boolean orderResult = createOrder((Order) arg);
if (orderResult) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查订单状态,确认是否需要提交或回滚消息
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
Order order = new Order();
// 构建订单信息
Message message = new Message("order_delay_topic", "transaction_tag", order.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(message, order);
- **消息幂等性**:在消费者处理消息时,通过数据库唯一约束、状态机等方式保证消息处理的幂等性。例如在取消订单时,以订单ID作为唯一键,若订单已取消则不重复取消,避免多次取消导致数据不一致。