技术选型
- Spring Cloud Stream:用于构建消息驱动微服务的框架,它提供了对不同消息中间件的统一抽象,如RabbitMQ、Kafka 等。
- 消息中间件:
- RabbitMQ:轻量级,支持多种协议,可靠性高,适合企业级应用。它具有消息持久化、事务等特性保证消息可靠传递。
- Kafka:高吞吐量,适合处理大量数据的实时流处理场景。它通过分区、副本机制保证数据的可靠性和高可用性。
消息传递机制
- 订单服务:
- 当订单创建时,订单服务通过Spring Cloud Stream的
Binder
将消息发送到指定的消息中间件队列(以RabbitMQ为例)。例如,定义一个输出通道OrderOutput
:
public interface OrderOutput {
String CHANNEL = "order-outbound";
@Output(CHANNEL)
MessageChannel orderOutbound();
}
- 然后在订单创建逻辑中,通过`OrderOutput`发送消息:
@Autowired
private OrderOutput orderOutput;
public void createOrder(Order order) {
orderOutput.orderOutbound().send(MessageBuilder.withPayload(order).build());
}
- 库存服务:
- 库存服务通过Spring Cloud Stream的
Binder
从消息中间件队列接收消息。定义一个输入通道InventoryInput
:
public interface InventoryInput {
String CHANNEL = "order-inbound";
@Input(CHANNEL)
SubscribableChannel orderInbound();
}
- 并创建一个消息监听器来处理接收到的消息:
@StreamListener(InventoryInput.CHANNEL)
public void handleOrder(Order order) {
// 减少库存逻辑
inventoryService.reduceInventory(order.getProductId(), order.getQuantity());
}
可能遇到的问题及解决方案
- 消息丢失问题:
- 原因:在消息发送、传输或接收过程中可能出现异常导致消息丢失。
- 解决方案:
- 消息持久化:对于RabbitMQ,将队列和消息都设置为持久化。在Spring Cloud Stream配置中,可以通过
spring.rabbitmq.template.delivery-mode: 2
设置消息持久化。对于Kafka,通过设置acks = all
确保消息被所有副本确认。
- 事务机制:在RabbitMQ中,可以使用事务。在订单服务发送消息时,开启事务,发送成功后提交事务,失败则回滚。但事务会影响性能,应谨慎使用。
- 发送确认机制:RabbitMQ支持发送确认(
confirm
)和返回确认(return
)。订单服务配置spring.rabbitmq.publisher-confirm-type: correlated
和spring.rabbitmq.publisher-returns: true
,并实现RabbitTemplate.ConfirmCallback
和RabbitTemplate.ReturnCallback
接口,确保消息成功发送到Broker。
- 消息重复消费问题:
- 原因:在网络波动、消息中间件故障恢复等情况下,可能会导致消息被重复投递。
- 解决方案:
- 幂等性处理:库存服务在处理消息时保证幂等性。例如,在减少库存方法中,先查询当前库存,判断是否足够减少,如果足够则进行减少操作。或者使用数据库的唯一约束,每次减少库存时插入一条记录,如果插入失败(说明已处理过)则忽略。
- 消息去重表:使用数据库或Redis等存储构建消息去重表。库存服务在处理消息前,先查询去重表,判断该消息是否已处理过。
- 消息顺序性问题:
- 原因:在高并发场景下,消息中间件可能无法保证消息的顺序性。
- 解决方案:
- 分区策略:对于Kafka,根据订单相关的关键信息(如订单号、用户ID等)进行分区,保证同一业务逻辑相关的消息发送到同一个分区,消费时按分区顺序消费。
- 全局序列号:订单服务在发送消息时为每个消息添加全局唯一序列号,库存服务按序列号顺序处理消息。