MST

星途 面试题库

面试题:Redis事务补偿与消息队列在简单业务场景中的协同

假设你有一个简单的电商下单业务,用户下单后需要扣减库存并生成订单记录。使用Redis事务和消息队列来实现这个过程,当扣减库存成功但生成订单记录失败时,如何利用Redis事务补偿机制和消息队列进行处理,请描述具体步骤及代码实现思路(语言不限)。
47.1万 热度难度
数据库Redis

知识考点

AI 面试

面试题答案

一键面试

具体步骤

  1. 下单操作
    • 用户发起下单请求,首先开启 Redis 事务。
    • 在事务中,尝试扣减库存,例如使用 INCRBYDECRBY 命令来修改库存数量(假设库存以 Redis 键值对存储,键为商品 ID,值为库存数量)。
    • 如果扣减库存成功,将订单信息(如订单 ID、商品 ID、用户 ID 等)发送到消息队列(例如 RabbitMQ、Kafka 等)。
    • 执行 Redis 事务提交。
  2. 订单记录生成
    • 消息队列消费者监听队列,当接收到订单消息后,尝试生成订单记录到数据库(如 MySQL 等)。
    • 如果生成订单记录成功,继续处理下一条消息。
    • 如果生成订单记录失败,将订单消息重新发送回消息队列(可以设置重试次数等机制),同时利用 Redis 事务补偿机制。
  3. Redis 事务补偿
    • 在 Redis 中,可以设置一个键来记录失败的订单相关信息(例如失败订单的 ID 等)。
    • 当生成订单记录失败时,开启 Redis 事务,将失败订单信息添加到这个记录失败订单的键中(如使用 SETHSET 等命令)。
    • 同时,在 Redis 事务中,根据订单信息,恢复之前扣减的库存(例如使用 INCRBY 命令将库存数量加回去)。
    • 执行 Redis 事务提交。
  4. 后续处理
    • 可以设置定时任务,定期检查 Redis 中记录失败订单的键,对于多次重试仍失败的订单,可以人工介入处理。

代码实现思路(以 Python 为例,结合 Redis 和 RabbitMQ)

  1. 下单逻辑
import redis
import pika

r = redis.Redis(host='localhost', port=6379, db = 0)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='order_queue')

def place_order(user_id, product_id, quantity):
    pipe = r.pipeline()
    try:
        # 开启事务
        pipe.multi()
        # 扣减库存
        stock_key = f'product:{product_id}:stock'
        pipe.decrby(stock_key, quantity)
        # 获取当前库存
        current_stock = pipe.get(stock_key)
        if int(current_stock) < 0:
            # 库存不足,回滚事务
            pipe.discard()
            return False
        # 生成订单消息
        order_message = {
            'user_id': user_id,
            'product_id': product_id,
            'quantity': quantity
        }
        # 发送订单消息到队列
        channel.basic_publish(exchange='', routing_key='order_queue', body=str(order_message))
        # 提交事务
        pipe.execute()
        return True
    except Exception as e:
        print(f"下单过程出错: {e}")
        pipe.discard()
        return False
  1. 订单记录生成逻辑
import json
import pymysql

def process_order_message():
    connection = pymysql.connect(host='localhost', user='root', password='password', db='ecommerce', charset='utf8mb4')
    cursor = connection.cursor()
    def callback(ch, method, properties, body):
        order_message = json.loads(body)
        try:
            # 生成订单记录
            sql = "INSERT INTO orders (user_id, product_id, quantity) VALUES (%s, %s, %s)"
            cursor.execute(sql, (order_message['user_id'], order_message['product_id'], order_message['quantity']))
            connection.commit()
        except Exception as e:
            print(f"生成订单记录失败: {e}")
            # 重新发送消息到队列
            channel.basic_publish(exchange='', routing_key='order_queue', body=body)
            # Redis 事务补偿
            r = redis.Redis(host='localhost', port=6379, db = 0)
            pipe = r.pipeline()
            pipe.multi()
            failed_order_key = 'failed_orders'
            pipe.hset(failed_order_key, order_message['order_id'], body)
            stock_key = f'product:{order_message["product_id"]}:stock'
            pipe.incrby(stock_key, order_message['quantity'])
            pipe.execute()
    channel.basic_consume(queue='order_queue', on_message_callback=callback, auto_ack=True)
    channel.start_consuming()

上述代码只是一个简单的实现思路示例,实际应用中还需要考虑更多的细节,如异常处理、消息队列的可靠性、数据库连接池等。