import redis
def transfer(redis_client, from_account, to_account, amount):
pipe = redis_client.pipeline()
lock_key = f"transfer:{from_account}:{to_account}"
try:
# 获取锁
while not redis_client.set(lock_key, "locked", nx=True, ex=10):
pass
# 开启事务
pipe.multi()
# 获取账户A余额
from_balance = pipe.get(from_account)
if from_balance is None or int(from_balance) < amount:
raise ValueError("Insufficient funds")
# 账户A扣钱
pipe.decrby(from_account, amount)
# 账户B加钱
pipe.incrby(to_account, amount)
# 执行事务
pipe.execute()
finally:
# 释放锁
redis_client.delete(lock_key)
if __name__ == "__main__":
r = redis.Redis(host='localhost', port=6379, db=0)
r.set('accountA', 1000)
r.set('accountB', 500)
transfer(r, 'accountA', 'accountB', 200)
代码解释
- 锁机制:
- 使用
redis_client.set(lock_key, "locked", nx=True, ex=10)
来获取锁。nx=True
表示只有当lock_key
不存在时才设置成功,这样可以保证同一时间只有一个进程或线程能够获取到锁。ex=10
设置锁的过期时间为10秒,防止因程序异常而导致锁永远不释放。
- 在
while
循环中不断尝试获取锁,直到获取成功为止。
- 在
finally
块中使用redis_client.delete(lock_key)
来释放锁,确保无论事务执行成功与否,锁都会被释放。
- 事务机制:
- 使用
pipe = redis_client.pipeline()
创建一个管道对象。
- 使用
pipe.multi()
开启一个事务,在这之后的所有命令都会被放入队列中,并不会立即执行。
- 依次将获取账户A余额、账户A扣钱、账户B加钱的操作添加到事务队列中。
- 使用
pipe.execute()
执行事务,这会将队列中的所有命令一次性发送到Redis服务器执行,保证了这些操作的原子性,从而确保数据一致性。如果在事务执行过程中出现错误,Redis会自动回滚事务,不会对数据造成部分修改的情况。