面试题答案
一键面试基于Redis消息队列实现延迟消息机制
- 使用Sorted Set(有序集合):
- 添加消息:当用户发布动态时,计算1小时后的时间戳
score
。将消息相关信息(如动态ID等)作为member
,以(member, score)
的形式添加到Redis的Sorted Set中。例如:
import redis import time r = redis.Redis(host='localhost', port=6379, db = 0) dynamic_id = 12345 delay_time = 3600 # 1小时 future_time = int(time.time()) + delay_time r.zadd('delayed_messages', {dynamic_id: future_time})
- 取出消息:使用一个定时任务(如Linux的
cron
或Python的APScheduler
),定期(如每隔1分钟)检查Sorted Set中score
小于等于当前时间戳的元素。例如:
current_time = int(time.time()) messages = r.zrangebyscore('delayed_messages', 0, current_time) for message in messages: # 处理消息,如推送热度提醒 r.zrem('delayed_messages', message)
- 添加消息:当用户发布动态时,计算1小时后的时间戳
- 使用Redis Stream结合XADD的
MAXLEN
选项实现延迟队列:- 添加消息:同样计算1小时后的时间戳。使用
XADD
命令添加消息到Stream中,并设置MAXLEN
选项,确保队列不会无限增长。例如:
XADD delayed_stream MAXLEN ~ 10000 * dynamic_id <timestamp>
- 取出消息:使用
XREADGROUP
命令结合COUNT
和BLOCK
选项,阻塞读取Stream中的消息。但需要额外逻辑判断消息是否已到延迟时间。
- 添加消息:同样计算1小时后的时间戳。使用
与MySQL数据库交互时确保数据准确和完整性的注意事项
- 事务处理:
- 在涉及数据库操作时,使用事务来确保一组操作要么全部成功,要么全部失败。例如,在Python中使用
pymysql
库:
import pymysql conn = pymysql.connect(host='localhost', user='user', password='password', database='social_db') try: with conn.cursor() as cursor: # 开启事务 conn.begin() # 数据库操作,如插入动态相关数据 cursor.execute("INSERT INTO dynamics (id, content) VALUES (%s, %s)", (dynamic_id, 'example content')) # 提交事务 conn.commit() except Exception as e: # 回滚事务 conn.rollback() raise e finally: conn.close()
- 在涉及数据库操作时,使用事务来确保一组操作要么全部成功,要么全部失败。例如,在Python中使用
- 数据一致性检查:
- 在从MySQL读取数据时,要确保数据的一致性。例如,在读取动态热度相关数据时,要考虑到可能存在未提交的事务对数据的影响。可以使用
SELECT... FOR UPDATE
语句来锁定相关行,确保读取到的数据是最新且一致的。例如:
SELECT heat_score FROM dynamics WHERE id = 12345 FOR UPDATE;
- 在从MySQL读取数据时,要确保数据的一致性。例如,在读取动态热度相关数据时,要考虑到可能存在未提交的事务对数据的影响。可以使用
- 数据验证:
- 在将数据写入MySQL之前,要对数据进行严格的验证。例如,检查动态内容长度是否符合数据库表字段的设定,确保动态ID是唯一且合法的等。防止非法数据写入数据库导致数据不准确或完整性被破坏。
- 并发控制:
- 由于社交互动系统可能有高并发的读写操作,要使用合适的锁机制。除了上述的
SELECT... FOR UPDATE
,还可以在更新操作时使用乐观锁。例如,表结构中添加version
字段,每次更新时检查version
并递增:
UPDATE dynamics SET heat_score = heat_score + 1, version = version + 1 WHERE id = 12345 AND version = <current_version>;
- 检查更新的影响行数,如果为0,则表示数据在更新前已被其他事务修改,需要重新读取并操作。
- 由于社交互动系统可能有高并发的读写操作,要使用合适的锁机制。除了上述的