队列满溢异常处理
- Kafka:
- 生产者端:
- Kafka生产者可以设置
acks
参数,比如设置为 all
以确保消息被完全确认。当队列满溢时,生产者会收到 TimeoutException
等异常。此时可以在生产者的 onCompletion
回调方法中捕获异常,进行重试逻辑。例如:
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 记录日志
logger.error("消息发送失败", exception);
// 重试逻辑
int maxRetries = 3;
int retryCount = 0;
while (retryCount < maxRetries) {
try {
producer.send(record).get();
break;
} catch (Exception e) {
retryCount++;
logger.error("第{}次重试发送消息失败", retryCount, e);
}
}
}
});
- 还可以通过调整 `buffer.memory` 参数来增加生产者缓冲区大小,减少队列满溢的概率。
- 消费者端:
- Kafka消费者一般不会直接面临队列满溢问题,因为它是从分区拉取消息。但如果消费者处理速度过慢,可能导致分区“积压”看起来像队列满溢。此时可以增加消费者实例数量,通过调整
num.partitions
来更好地分配负载。
- RabbitMQ:
- 生产者端:
- RabbitMQ生产者在发送消息时,如果队列满溢会收到
Channel.Close
异常。可以在捕获异常后,将消息暂存到本地缓存(如 ConcurrentHashMap
),同时启动一个定时任务,每隔一段时间尝试重新发送缓存中的消息。例如:
try {
channel.basicPublish("", "queueName", null, "message".getBytes("UTF-8"));
} catch (IOException e) {
if (e instanceof Channel.Close) {
// 暂存消息到本地缓存
localCache.put(System.currentTimeMillis(), "message");
// 启动定时任务重新发送
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(() -> {
for (Map.Entry<Long, String> entry : localCache.entrySet()) {
try {
channel.basicPublish("", "queueName", null, entry.getValue().getBytes("UTF-8"));
localCache.remove(entry.getKey());
} catch (IOException ex) {
logger.error("重新发送消息失败", ex);
}
}
}, 0, 1, TimeUnit.SECONDS);
}
}
- 消费者端:
- RabbitMQ消费者可以使用
basic.qos
方法设置预取计数(prefetchCount
),避免一次性接收过多消息导致处理不过来。如果处理过程中发现队列满溢,可以暂停消费一段时间,等队列压力减小后再恢复。例如:
channel.basicQos(10); // 设置预取计数为10
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
// 处理消息
processMessage(new String(body, "UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
// 记录日志
logger.error("处理消息失败", e);
// 暂停消费10秒
Thread.sleep(10000);
}
}
};
channel.basicConsume("queueName", false, consumer);
消息积压导致的客户端处理异常
- Kafka:
- 消费者端:
- 增加消费者实例数量,通过调整
consumer.group.id
来控制消费组。每个消费组内的消费者实例会自动分配分区进行消费。例如在Java中配置消费者时:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "myGroup");
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic"));
- 优化消费者的消息处理逻辑,确保处理速度尽可能快。比如使用多线程处理消息,但要注意线程安全问题。可以使用线程池来管理线程,例如:
ExecutorService executorService = Executors.newFixedThreadPool(10);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
executorService.submit(() -> {
// 处理消息
processMessage(record.value());
});
}
- RabbitMQ:
- 消费者端:
- 同样可以增加消费者实例数量来加快消息处理速度。在Java中可以启动多个消费者线程,每个线程连接到RabbitMQ服务器并消费消息。
- 对于消息处理逻辑,可以将复杂的业务逻辑拆分,异步处理一些非关键操作。例如,如果消息处理涉及数据库操作和发送邮件操作,可以先完成数据库操作并确认消息,然后异步发送邮件。
// 主线程处理数据库操作
channel.basicConsume("queueName", false, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
try {
// 数据库操作
databaseOperation(message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// 异步发送邮件
CompletableFuture.runAsync(() -> sendEmail(message));
} catch (Exception e) {
logger.error("处理消息失败", e);
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
}, consumerTag -> {});