设计思路
- 消息队列选择:可以选择 RabbitMQ、Kafka 等成熟的消息队列系统。以 RabbitMQ 为例,它支持多种消息协议,可靠性高,适合分布式系统。
- 订单创建:用户下单时,生成订单数据并发送订单创建消息到消息队列。
- 库存扣减:消费者监听订单创建消息,获取订单信息,尝试扣减库存。如果库存不足,发送回滚订单消息到消息队列;如果扣减成功,发送支付处理消息。
- 支付处理:消费者监听支付处理消息,进行支付操作。
- 幂等性处理:为每个消息添加唯一标识,在处理消息前先检查该标识是否已处理过。
关键代码逻辑
- 订单创建
require 'bunny'
# 连接 RabbitMQ
connection = Bunny.new
connection.start
channel = connection.create_channel
queue = channel.queue('order_create_queue')
order = { order_id: SecureRandom.uuid, user_id: 1, product_ids: [1, 2, 3] }
queue.publish(order.to_json)
connection.close
- 库存扣减
require 'bunny'
connection = Bunny.new
connection.start
channel = connection.create_channel
order_queue = channel.queue('order_create_queue')
rollback_queue = channel.queue('order_rollback_queue')
payment_queue = channel.queue('payment_process_queue')
order_queue.subscribe(block: true) do |delivery_info, properties, body|
order = JSON.parse(body)
product_ids = order['product_ids']
all_in_stock = product_ids.all? { |product_id| check_stock(product_id) }
if all_in_stock
product_ids.each { |product_id| deduct_stock(product_id) }
payment_queue.publish(order.to_json)
else
rollback_queue.publish(order.to_json)
end
end
connection.close
def check_stock(product_id)
# 检查库存逻辑,返回 true 或 false
true
end
def deduct_stock(product_id)
# 扣减库存逻辑
end
- 订单回滚
require 'bunny'
connection = Bunny.new
connection.start
channel = connection.create_channel
rollback_queue = channel.queue('order_rollback_queue')
rollback_queue.subscribe(block: true) do |delivery_info, properties, body|
order = JSON.parse(body)
# 回滚订单逻辑,例如删除订单记录
end
connection.close
- 支付处理
require 'bunny'
connection = Bunny.new
connection.start
channel = connection.create_channel
payment_queue = channel.queue('payment_process_queue')
payment_queue.subscribe(block: true) do |delivery_info, properties, body|
order = JSON.parse(body)
# 支付处理逻辑
end
connection.close
- 幂等性处理
require 'bunny'
require 'set'
processed_messages = Set.new
connection = Bunny.new
connection.start
channel = connection.create_channel
queue = channel.queue('any_queue')
queue.subscribe(block: true) do |delivery_info, properties, body|
message_id = properties.message_id
unless processed_messages.include?(message_id)
processed_messages.add(message_id)
# 正常消息处理逻辑
end
end
connection.close
可能面临的挑战及解决方案
- 消息丢失:
- 挑战:消息在发送、传输或消费过程中可能丢失。
- 解决方案:在 RabbitMQ 中,使用持久化队列和消息,并且开启 confirm 模式,确保消息发送成功。消费者端使用手动确认机制,处理完消息后再确认。
- 消息顺序性:
- 挑战:在高并发情况下,消息可能乱序到达消费者。
- 解决方案:对于有顺序要求的消息,将其发送到同一个队列,并且消费者采用单线程处理该队列的消息。
- 系统性能:
- 挑战:大量消息的处理可能导致系统性能瓶颈。
- 解决方案:合理设置消息队列的参数,如队列大小、预取数量等。同时,可以采用多消费者并行处理消息,但要注意幂等性处理。