面试题答案
一键面试Broker 端保障消息幂等性
- 原理:
- RocketMQ Broker 端通过
MessageId
来保证消息的幂等性。当 Producer 发送消息时,会生成唯一的MessageId
。Broker 在存储消息时,会检查该MessageId
是否已经存在。如果存在,则不会重复存储该消息。 - 具体源码实现中,在
DefaultMessageStore#putMessage
方法里,CommitLog
在追加消息时,会通过MessageId
相关逻辑来避免重复存储。例如,CommitLog
会将消息写入文件,同时在内存数据结构中记录已处理的MessageId
,当新消息写入时会先检查。
- RocketMQ Broker 端通过
- 优势:
- 这种方式能够在 Broker 端有效防止消息的重复存储,避免因网络等原因导致 Producer 重复发送消息时,Broker 产生重复消息。
- 不足:
- 对于分布式环境下不同 Broker 节点间的一致性维护可能存在挑战,虽然 RocketMQ 通过主从架构等一定程度缓解,但极端情况下仍可能出现问题。
Consumer 端保障消息幂等性
- 原理:
- Consumer 端通过
MessageId
或业务唯一键来实现幂等性。Consumer 在处理消息时,会根据MessageId
或者业务逻辑中自定义的唯一键(例如订单号等)来判断消息是否已经处理过。 - 在源码层面,以
DefaultMQPushConsumer
为例,当接收到消息进行消费时,业务代码中可以通过记录已处理的MessageId
或者唯一键到外部存储(如数据库、Redis 等),每次消费前查询是否已处理。例如,在MessageListenerConcurrently
的实现类中,业务处理逻辑里可以添加幂等性判断逻辑。
- Consumer 端通过
- 优势:
- 能够在 Consumer 端进一步确保即使 Broker 端出现少量重复消息,也不会对业务产生重复影响。通过业务唯一键还能结合业务场景灵活处理幂等性。
- 不足:
- 增加了额外的存储查询开销,特别是高并发场景下,频繁查询外部存储可能导致性能瓶颈。同时,对业务代码侵入较大,需要在每个消费逻辑中添加幂等性判断。
高并发场景下幂等性处理机制性能瓶颈优化
- 优化方面:
- 减少外部存储查询次数:频繁查询数据库或 Redis 等外部存储会带来网络 I/O 开销,是性能瓶颈之一。
- 优化数据结构:现有的记录已处理消息的数据结构在高并发下可能存在竞争问题,影响性能。
- 异步处理:当前幂等性判断多为同步操作,在高并发下会阻塞消费线程。
- 优化思路及方案:
- 缓存优化:
- 思路:在 Consumer 端增加本地缓存,如 Guava Cache。先在本地缓存中查询消息是否已处理,若未命中再查询外部存储。
- 方案:在
MessageListenerConcurrently
实现类中,初始化 Guava Cache,例如:LoadingCache<String, Boolean> cache = CacheBuilder.newBuilder().maximumSize(10000).build(new CacheLoader<String, Boolean>() { @Override public Boolean load(String key) throws Exception { // 查询外部存储判断是否已处理 return externalStorageCheck(key); } });
,在处理消息时,先cache.get(message.getMessageId())
查询本地缓存。
- 数据结构优化:
- 思路:使用更适合高并发的无锁数据结构,如 ConcurrentHashMap 替代普通 HashMap 来记录已处理消息。
- 方案:在 Consumer 端维护一个
ConcurrentHashMap<String, Boolean>
来记录已处理的MessageId
,在消费消息前通过concurrentHashMap.containsKey(message.getMessageId())
判断。
- 异步处理:
- 思路:将幂等性判断操作异步化,不阻塞消费线程。
- 方案:可以使用 CompletableFuture 或线程池来实现异步处理。例如,在
MessageListenerConcurrently
实现类中,提交幂等性判断任务到线程池ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> { // 幂等性判断逻辑 return idempotencyCheck(message.getMessageId()); }, executor); future.thenAccept(result -> { if (result) { // 处理消息逻辑 } });
,这样消费线程不会被幂等性判断阻塞。
- 缓存优化: