面试题答案
一键面试1. 使用唯一ID和去重表
- 策略描述:为每条消息生成一个唯一的ID,消费者在处理消息前,先查询数据库中的去重表,看该ID是否已处理过。如果已处理过,则直接丢弃该消息;否则,处理消息并将该ID插入去重表。
- 具体实现方式(以Java和MySQL为例):
// 假设使用Spring JDBC @Autowired private JdbcTemplate jdbcTemplate; public void processMessage(String messageId, String messageContent) { String selectSql = "SELECT COUNT(1) FROM idempotency_table WHERE message_id = ?"; int count = jdbcTemplate.queryForObject(selectSql, new Object[]{messageId}, Integer.class); if (count > 0) { // 已处理过,丢弃消息 return; } // 处理消息逻辑 handleMessage(messageContent); String insertSql = "INSERT INTO idempotency_table (message_id) VALUES (?)"; jdbcTemplate.update(insertSql, messageId); } private void handleMessage(String messageContent) { // 实际的消息处理逻辑 System.out.println("Processing message: " + messageContent); }
2. 状态机控制
- 策略描述:定义一个状态机,消息处理分为几个明确的状态,如“未处理”“处理中”“已处理”。消费者在处理消息时,先检查消息当前状态。如果是“未处理”,则将状态设为“处理中”,处理完消息后,将状态设为“已处理”。如果状态已经是“处理中”或“已处理”,则不再重复处理。
- 具体实现方式(以Python和Redis为例,使用Redis记录状态):
import redis r = redis.Redis(host='localhost', port=6379, db=0) def process_message(message_id, message_content): status = r.get(message_id) if status == b'processing' or status == b'processed': return r.set(message_id, 'processing') try: # 处理消息逻辑 handle_message(message_content) r.set(message_id, 'processed') except Exception as e: r.set(message_id, 'unprocessed') raise e def handle_message(message_content): print("Processing message: " + message_content)
3. 数据库唯一约束
- 策略描述:在数据库表中对关键业务字段设置唯一约束。当消费者处理消息并尝试插入或更新数据库记录时,如果违反唯一约束,说明该消息已经处理过,数据库会抛出异常,消费者捕获异常并忽略该消息。
- 具体实现方式(以SQL Server为例):
- 假设数据库表
order_table
中有order_no
字段作为唯一标识业务的字段,且设置了唯一约束。
CREATE TABLE order_table ( order_id INT IDENTITY(1,1) PRIMARY KEY, order_no VARCHAR(50) UNIQUE, order_amount DECIMAL(10, 2), -- 其他字段 );
- 在应用程序(如C#)中处理消息插入订单逻辑:
using System; using System.Data.SqlClient; class Program { static void Main() { string orderNo = "123456"; decimal orderAmount = 100.00m; string connectionString = "your_connection_string"; using (SqlConnection connection = new SqlConnection(connectionString)) { string insertSql = "INSERT INTO order_table (order_no, order_amount) VALUES (@orderNo, @orderAmount)"; SqlCommand command = new SqlCommand(insertSql, connection); command.Parameters.AddWithValue("@orderNo", orderNo); command.Parameters.AddWithValue("@orderAmount", orderAmount); try { connection.Open(); command.ExecuteNonQuery(); } catch (SqlException ex) when (ex.Number == 2627) { // 违反唯一约束,消息已处理过,忽略 Console.WriteLine("Message already processed."); } } } }
- 假设数据库表