MST

星途 面试题库

面试题:消息队列Kafka消息存储格式扩展

假设业务需求要求在Kafka现有消息存储格式基础上,添加自定义元数据字段,且要保证不影响原有Kafka的性能、兼容性和扩展性,你会如何设计实现?
46.2万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试
  1. 设计自定义元数据格式
    • 定义一种紧凑且可扩展的格式来存储自定义元数据。例如,可以使用JSON或Protobuf。
      • JSON:它具有良好的可读性和广泛的工具支持。示例:
{
    "custom_field1": "value1",
    "custom_field2": 123
}
 - **Protobuf**:生成的代码紧凑,序列化和反序列化速度快,适合性能敏感场景。定义.proto文件如下:
syntax = "proto3";

message CustomMetadata {
    string custom_field1 = 1;
    int32 custom_field2 = 2;
}
  1. 集成到Kafka消息格式
    • 使用消息头(Headers):Kafka消息本身支持消息头,可以将自定义元数据存储在消息头中。
      • 在生产者端,当发送消息时,将自定义元数据添加到消息头。以Java为例,使用Kafka生产者:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class CustomMetadataProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        String topic = "test-topic";
        String key = "key1";
        String value = "message value";

        // 假设使用JSON格式存储自定义元数据
        String customMetadataJson = "{\"custom_field1\":\"value1\",\"custom_field2\":123}";
        List<Header> headers = new ArrayList<>();
        headers.add(new RecordHeader("custom - metadata", customMetadataJson.getBytes()));

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value, headers);
        producer.send(record);
        producer.close();
    }
}
 - 在消费者端,从消息头中提取自定义元数据。同样以Java为例:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.header.Header;

import java.util.Arrays;
import java.util.Properties;

public class CustomMetadataConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test - group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test - topic"));

        while (true) {
            ConsumerRecord<String, String> record = consumer.poll(100).iterator().next();
            Header customMetadataHeader = record.headers().lastHeader("custom - metadata");
            if (customMetadataHeader!= null) {
                String customMetadataJson = new String(customMetadataHeader.value());
                // 处理自定义元数据
                System.out.println("Custom Metadata: " + customMetadataJson);
            }
        }
    }
}
  • 扩展消息体(Value):如果消息头空间不够或有其他特殊需求,可以考虑在消息体中预留一段区域存储自定义元数据。
    • 生产者端:将自定义元数据和原有消息内容按照一定格式组合。例如,先写自定义元数据长度,再写自定义元数据,最后写原有消息内容。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.nio.ByteBuffer;
import java.util.Properties;

public class CustomMetadataProducerWithValue {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

        String topic = "test - topic";
        String key = "key1";
        String originalValue = "message value";

        // 假设使用JSON格式存储自定义元数据
        String customMetadataJson = "{\"custom_field1\":\"value1\",\"custom_field2\":123}";
        byte[] customMetadataBytes = customMetadataJson.getBytes();
        byte[] originalValueBytes = originalValue.getBytes();

        ByteBuffer buffer = ByteBuffer.allocate(4 + customMetadataBytes.length + originalValueBytes.length);
        buffer.putInt(customMetadataBytes.length);
        buffer.put(customMetadataBytes);
        buffer.put(originalValueBytes);

        ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, key, buffer.array());
        producer.send(record);
        producer.close();
    }
}
 - 消费者端:按照相同格式解析消息体,先读取自定义元数据长度,再读取自定义元数据,最后读取原有消息内容。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Properties;

public class CustomMetadataConsumerWithValue {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test - group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");

        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test - topic"));

        while (true) {
            ConsumerRecord<String, byte[]> record = consumer.poll(100).iterator().next();
            byte[] value = record.value();
            ByteBuffer buffer = ByteBuffer.wrap(value);
            int customMetadataLength = buffer.getInt();
            byte[] customMetadataBytes = new byte[customMetadataLength];
            buffer.get(customMetadataBytes);
            byte[] originalValueBytes = new byte[buffer.remaining()];
            buffer.get(originalValueBytes);

            String customMetadataJson = new String(customMetadataBytes);
            String originalValue = new String(originalValueBytes);

            // 处理自定义元数据和原有消息
            System.out.println("Custom Metadata: " + customMetadataJson);
            System.out.println("Original Value: " + originalValue);
        }
    }
}
  1. 兼容性考虑
    • 版本控制:如果自定义元数据格式可能发生变化,引入版本号。可以在消息头或消息体预留位置存储版本号。例如,在消息头添加一个metadata - version字段。
    • 向后兼容:确保旧版本的消费者在遇到带有新自定义元数据的消息时,能够正常处理原有消息内容,忽略不认识的自定义元数据。
  2. 性能优化
    • 避免频繁序列化/反序列化:如果使用JSON,尽量在关键路径(如消息处理循环)外进行解析和生成操作。对于Protobuf,利用其高效的序列化和反序列化机制。
    • 批量处理:在生产者端和消费者端,尽量批量处理消息,减少I/O和网络开销。
  3. 扩展性设计
    • 模块化:将自定义元数据的处理逻辑封装成独立模块,便于后续添加新的自定义字段或修改处理逻辑。
    • 可插拔式设计:设计成可插拔的架构,使得不同的业务可以根据需求灵活启用或禁用自定义元数据功能,以及替换不同的自定义元数据处理实现。