策略一:分桶优化
- 原理:将用户ID按一定规则(如哈希取模)分散到多个Redis键中,避免单个键的数据量过大,从而减少BITOP操作的复杂度。
- 实际代码实现(以Python为例):
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
def sign(user_id, date):
bucket_num = 100 # 分桶数量
bucket = hash(user_id) % bucket_num
key = f'sign:{bucket}:{date}'
offset = user_id % bucket_num
r.setbit(key, offset, 1)
def is_signed(user_id, date):
bucket_num = 100
bucket = hash(user_id) % bucket_num
key = f'sign:{bucket}:{date}'
offset = user_id % bucket_num
return r.getbit(key, offset)
策略二:批量操作
- 原理:减少对BITOP的频繁调用,将多个操作合并成一次批量操作,减少Redis网络开销。
- 实际代码实现(以Python为例):
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
def batch_sign(user_ids, date):
key = f'sign:{date}'
pipe = r.pipeline()
for user_id in user_ids:
offset = user_id
pipe.setbit(key, offset, 1)
pipe.execute()
def batch_is_signed(user_ids, date):
key = f'sign:{date}'
pipe = r.pipeline()
for user_id in user_ids:
offset = user_id
pipe.getbit(key, offset)
return pipe.execute()
策略三:定期清理无效数据
- 原理:对于历史的签到数据,如果不再需要,可以定期删除,避免数据累积导致性能问题。
- 实际代码实现(以Python为例):
import redis
import datetime
r = redis.Redis(host='localhost', port=6379, db=0)
def clean_old_data(days_to_keep=30):
today = datetime.date.today()
days_ago = today - datetime.timedelta(days=days_to_keep)
keys = r.keys('sign:*')
for key in keys:
date_str = key.decode('utf-8').split(':')[1]
try:
date = datetime.datetime.strptime(date_str, '%Y-%m-%d').date()
if date < days_ago:
r.delete(key)
except ValueError:
pass