面试题答案
一键面试设计方案
- 消息发送记录:在生产者端,维护一个消息发送记录表,记录每条消息的唯一标识(如消息ID)、发送状态(成功/失败)、发送时间等信息。这个表可以存储在关系型数据库(如MySQL)或分布式存储(如Redis)中。
- 定时任务检查:利用定时任务(如Spring Task、Quartz等),定期检查消息发送记录表中发送失败的消息。检查周期可以根据实际情况调整,比如每隔1分钟检查一次。
- 幂等性处理:
- 业务层面:在消费者端,设计业务逻辑时确保对于相同消息的多次处理结果是一致的。例如,在更新数据库操作中,使用唯一索引避免重复插入相同数据。
- 消息层面:RocketMQ提供了消息幂等性支持,通过消息的唯一标识(Message ID)来判断是否为重复消息。消费者在处理消息前,可以先根据消息ID查询本地是否已经处理过该消息。
- 重发策略:
- 固定次数重发:设定一个最大重发次数,如3次。每次重发间隔可以适当延长,例如第一次重发间隔10秒,第二次间隔30秒,第三次间隔1分钟。
- 动态重发:根据网络抖动的情况动态调整重发策略。例如,可以通过监控网络状况,当网络抖动较严重时,增加重发间隔和最大重发次数。
代码示例(Java + RocketMQ + MySQL + Spring Boot)
- 引入依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-task</artifactId>
</dependency>
</dependencies>
- 配置文件(application.properties):
spring.datasource.url=jdbc:mysql://localhost:3306/your_database
spring.datasource.username=root
spring.datasource.password=password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
rocketmq.name-server=localhost:9876
rocketmq.producer.group=test-group
- 消息发送记录表实体类(MessageSendRecord.java):
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
@Entity
public class MessageSendRecord {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String messageId;
private String topic;
private String status;
private long sendTime;
// 省略getter和setter方法
}
- 消息发送记录仓储接口(MessageSendRecordRepository.java):
import org.springframework.data.jpa.repository.JpaRepository;
public interface MessageSendRecordRepository extends JpaRepository<MessageSendRecord, Long> {
MessageSendRecord findByMessageId(String messageId);
}
- RocketMQ生产者配置(RocketMQConfig.java):
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RocketMQConfig {
@Bean
public RocketMQTemplate rocketMQTemplate() {
return new RocketMQTemplate();
}
}
- 消息生产者服务(MessageProducerService.java):
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessageProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, String message) {
try {
rocketMQTemplate.syncSend(topic, message);
// 发送成功记录到数据库
} catch (Exception e) {
// 发送失败记录到数据库
}
}
}
- 定时任务重发消息(MessageResendTask.java):
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class MessageResendTask {
@Autowired
private MessageSendRecordRepository messageSendRecordRepository;
@Autowired
private MessageProducerService messageProducerService;
@Scheduled(cron = "0 */1 * * * *") // 每分钟执行一次
public void resendFailedMessages() {
// 查询发送失败的消息
messageSendRecordRepository.findByStatus("FAILED").forEach(record -> {
try {
messageProducerService.sendMessage(record.getTopic(), "");
// 更新发送状态为成功
} catch (Exception e) {
// 增加重发次数,超过最大重发次数则标记为永久失败
}
});
}
}
- 消费者端幂等性处理(MessageConsumer.java):
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "test-group")
public class MessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 根据消息ID检查是否已处理过
// 处理业务逻辑
}
}
通过以上设计方案和代码示例,可以实现一个可靠的消息重发机制,确保消息最终一致性且避免重复消费。