面试题答案
一键面试避免或处理消息重复消费的措施
- 使用消息唯一标识:为每条消息生成一个唯一的ID,例如使用UUID。消费者在处理消息前,先检查该ID是否已经处理过。
- 利用Redis的原子操作:借助Redis的SETNX(SET if Not eXists)命令,当消费者获取到消息时,以消息ID为键,尝试在Redis中设置一个值。如果设置成功,说明该消息尚未处理,消费者可以继续处理;如果设置失败,说明该消息已被处理过,直接丢弃。
- 消费确认机制:消费者在成功处理消息后,向Redis发送一个确认消息,标记该消息已被成功消费。下次消费时,先检查确认消息。
- 幂等性处理:设计业务逻辑时,保证对同一消息的多次处理结果相同。例如,数据库插入操作可以使用
INSERT IGNORE
或ON DUPLICATE KEY UPDATE
语句。
Python代码实现(使用消息唯一标识 + Redis SETNX)
import redis
import uuid
def consume_message(redis_client, queue_key):
# 从队列中获取消息
message = redis_client.rpop(queue_key)
if message is None:
return
# 假设消息格式为 (message_id, actual_message)
message_id, actual_message = message.decode('utf-8').split(':', 1)
# 使用SETNX检查消息是否已处理
if redis_client.setnx(f'processed:{message_id}', '1'):
try:
# 处理消息
print(f"Processing message: {actual_message}")
# 模拟业务处理
# 这里可以放置实际的业务逻辑代码
finally:
# 无论处理成功与否,删除已处理标记
redis_client.delete(f'processed:{message_id}')
else:
print(f"Duplicate message detected: {actual_message}, skipped.")
if __name__ == "__main__":
r = redis.Redis(host='localhost', port=6379, db=0)
queue_key = 'your_queue_key'
# 模拟生产者向队列中添加消息
for _ in range(5):
message_id = str(uuid.uuid4())
actual_message = f"Sample message {_}"
r.lpush(queue_key, f"{message_id}:{actual_message}")
# 模拟消费者消费消息
for _ in range(5):
consume_message(r, queue_key)
在上述代码中:
consume_message
函数从Redis队列中获取消息。- 消息格式为
message_id:actual_message
,通过split
方法分离。 - 使用
redis_client.setnx
检查消息是否已处理,如果未处理则处理消息,并在处理完成后删除已处理标记。 - 如果
setnx
返回False
,说明消息已被处理,直接跳过。