MST

星途 面试题库

面试题:Redis分布式锁唯一标识防重入漏洞防范

假设在一个基于Redis的分布式锁应用场景中,存在重入问题导致唯一标识安全性受损,你会如何设计方案来防范这一漏洞?请详细阐述设计思路和代码实现要点。
44.8万 热度难度
数据库Redis

知识考点

AI 面试

面试题答案

一键面试

设计思路

  1. 添加标识:在获取锁时,不仅使用锁的键,还关联当前线程或客户端的唯一标识(如线程ID、客户端ID等)。当释放锁时,确保释放的是当前持有锁的标识对应的锁,避免误释放其他线程的锁。
  2. 重入计数:对于同一个线程或客户端多次获取同一把锁的情况,增加重入计数。每次获取锁时计数加1,每次释放锁时计数减1,当计数为0时才真正释放锁。

代码实现要点(以Python和Redis-py为例)

  1. 获取锁
import redis
import threading

r = redis.Redis(host='localhost', port=6379, db=0)

def acquire_lock(lock_key, identifier, acquire_timeout=10):
    retry_count = 0
    while retry_count < acquire_timeout:
        # 使用SETNX命令尝试获取锁,并设置锁的值为当前标识
        result = r.setnx(lock_key, identifier)
        if result:
            # 获取锁成功,初始化重入计数为1
            r.hset(lock_key + '_reentry', identifier, 1)
            return True
        else:
            # 检查是否为当前标识持有锁,如果是则增加重入计数
            current_identifier = r.get(lock_key)
            if current_identifier.decode('utf-8') == identifier:
                current_reentry = r.hget(lock_key + '_reentry', identifier)
                if current_reentry is None:
                    r.hset(lock_key + '_reentry', identifier, 1)
                else:
                    r.hincrby(lock_key + '_reentry', identifier, 1)
                return True
        retry_count += 1
    return False
  1. 释放锁
def release_lock(lock_key, identifier):
    # 获取当前锁的持有标识
    current_identifier = r.get(lock_key)
    if current_identifier is not None and current_identifier.decode('utf-8') == identifier:
        # 获取当前标识的重入计数
        current_reentry = r.hget(lock_key + '_reentry', identifier)
        if current_reentry is not None and int(current_reentry) == 1:
            # 重入计数为1,删除锁及重入计数记录
            pipe = r.pipeline()
            pipe.delete(lock_key)
            pipe.delete(lock_key + '_reentry')
            pipe.execute()
        elif current_reentry is not None:
            # 重入计数大于1,减少重入计数
            r.hincrby(lock_key + '_reentry', identifier, -1)
        return True
    return False
  1. 使用示例
lock_key = 'example_lock'
identifier = str(threading.get_ident())

if acquire_lock(lock_key, identifier):
    try:
        # 业务逻辑
        print('Lock acquired, doing business logic...')
    finally:
        release_lock(lock_key, identifier)
else:
    print('Failed to acquire lock')