系统架构层面
- 消息发送重试机制:
- 在发送端,当出现网络闪断等异常导致消息发送失败时,开启重试逻辑。可以设置重试次数,比如3 - 5次。若多次重试后仍失败,则记录相关日志并通知运维人员。
- 实现方式:在发送消息的代码块中,通过try - catch捕获异常,在catch块中进行重试逻辑。例如:
for (int i = 0; i < maxRetryCount; i++) {
try {
SendResult sendResult = producer.send(message);
// 处理发送成功的逻辑
break;
} catch (Exception e) {
if (i == maxRetryCount - 1) {
// 记录日志,通知运维等
}
}
}
- 多Broker部署与主从架构:
- 部署多个Broker节点,并采用主从架构。主Broker负责接收和处理消息,从Broker实时同步主Broker的数据。当主Broker出现故障时,从Broker可以切换为主Broker继续提供服务。
- 配置方式:在RocketMQ的配置文件中,配置主从关系和Broker集群信息。例如,在
broker.conf
文件中设置brokerId
,0表示主Broker,大于0表示从Broker,并配置好主从之间的数据同步方式。
- 消息持久化:
- 确保Broker将接收到的消息持久化到磁盘。RocketMQ支持多种持久化方式,如异步刷盘和同步刷盘。同步刷盘能保证消息的可靠性,但性能相对较低;异步刷盘性能较高,但在极端情况下可能会丢失少量未刷盘的消息。
- 配置:在
broker.conf
文件中,通过设置flushDiskType = SYNC_FLUSH
或ASYNC_FLUSH
来选择刷盘方式。对于可靠性要求极高的场景,选择同步刷盘;对于性能要求较高且能接受少量消息丢失风险的场景,选择异步刷盘。
- 引入消息中间件集群的负载均衡:
- 采用负载均衡器(如Nginx等)将发送端和接收端的请求均匀分配到各个Broker节点上,避免单个Broker节点压力过大。同时,RocketMQ自身也提供了一定的负载均衡机制,如生产者根据负载均衡策略选择队列进行消息发送。
- 配置负载均衡器:以Nginx为例,在Nginx配置文件中设置代理服务器,将请求转发到不同的RocketMQ Broker节点。例如:
upstream rocketmq_cluster {
server broker1_ip:port;
server broker2_ip:port;
server broker3_ip:port;
}
server {
listen 80;
location / {
proxy_pass http://rocketmq_cluster;
}
}
代码实现层面
- 事务消息:
- 对于一些对业务一致性要求较高的场景,使用RocketMQ的事务消息。发送端先发送半消息,Broker接收成功后,发送端执行本地事务,根据本地事务执行结果决定提交或回滚半消息。
- 代码示例:
TransactionMQProducer producer = new TransactionMQProducer("group");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务逻辑
try {
// 业务操作
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务状态逻辑
return LocalTransactionState.COMMIT_MESSAGE;
}
});
Message message = new Message("topic", "tags", "keys", "body".getBytes());
SendResult sendResult = producer.sendMessageInTransaction(message, null);
- 消费端幂等性处理:
- 由于消息可能会重复消费,消费端需要实现幂等性。可以通过为每个消息生成唯一标识(如UUID),在消费时将消息标识记录到数据库或缓存中,每次消费前先检查该标识是否已被消费过。
- 代码示例:
public class Consumer {
private static final Set<String> consumedMessageIds = Collections.synchronizedSet(new HashSet<>());
public void consume(MessageExt message) {
String messageId = message.getMsgId();
if (consumedMessageIds.contains(messageId)) {
return;
}
consumedMessageIds.add(messageId);
// 处理消息逻辑
}
}
性能优化
- 优化消息发送重试机制:
- 采用指数退避策略,即每次重试的间隔时间逐渐增加,避免短时间内大量重试请求对系统造成过大压力。例如,第一次重试间隔1秒,第二次间隔2秒,第三次间隔4秒等。
- 代码实现:
int retryInterval = 1000;
for (int i = 0; i < maxRetryCount; i++) {
try {
SendResult sendResult = producer.send(message);
break;
} catch (Exception e) {
if (i < maxRetryCount - 1) {
try {
Thread.sleep(retryInterval);
retryInterval = retryInterval * 2;
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
}
- 合理配置刷盘策略:
- 对于异步刷盘,可以适当调整刷盘线程池大小和刷盘间隔时间,在保证一定可靠性的前提下提高性能。例如,增加刷盘线程池大小可以加快消息刷盘速度,但可能会占用更多系统资源。可以通过测试不同的配置参数,找到性能和可靠性的平衡点。
- 配置示例:在
broker.conf
文件中设置flushInterval=5000
(表示刷盘间隔为5秒),flushThreadPoolSize = 16
(表示刷盘线程池大小为16)。
- 优化消费端幂等性检查:
- 使用缓存(如Redis)进行幂等性检查,因为缓存的读写速度比数据库快。可以将消息标识存储在Redis的Set数据结构中,利用Redis的高效查找能力快速判断消息是否已被消费。同时,合理设置缓存的过期时间,避免缓存占用过多内存。
- 代码示例:
public class Consumer {
private Jedis jedis;
public Consumer() {
jedis = new Jedis("localhost", 6379);
}
public void consume(MessageExt message) {
String messageId = message.getMsgId();
if (jedis.sismember("consumed_messages", messageId)) {
return;
}
jedis.sadd("consumed_messages", messageId);
// 处理消息逻辑
}
}