MST

星途 面试题库

面试题:消息队列之Kafka Streams实时计算基础技巧

在Kafka Streams中,如何定义一个简单的流处理拓扑,实现对消息的过滤操作,仅保留满足特定条件(例如消息中某个字段大于100)的消息?请用代码示例说明,并简述主要步骤。
18.4万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试
  1. 主要步骤

    • 创建StreamsBuilder对象,用于构建流处理拓扑。
    • 使用streamsBuilder.stream()方法从Kafka主题读取流数据。
    • 对读取的流数据应用过滤操作,过滤出满足条件的消息。
    • 将过滤后的流数据写入到另一个Kafka主题。
  2. 代码示例(以Java为例)

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;

public class KafkaStreamsFilterExample {
    public static void main(String[] args) {
        // 配置Kafka Streams
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "filter-example-app");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());

        // 创建StreamsBuilder
        StreamsBuilder streamsBuilder = new StreamsBuilder();

        // 从Kafka主题读取流数据
        KStream<String, Integer> inputStream = streamsBuilder.stream("input-topic");

        // 过滤操作,假设消息值大于100
        KStream<String, Integer> filteredStream = inputStream.filter((key, value) -> value > 100);

        // 将过滤后的流数据写入到另一个Kafka主题
        filteredStream.to("output-topic");

        // 构建拓扑
        Topology topology = streamsBuilder.build();

        // 创建并启动Kafka Streams实例
        KafkaStreams streams = new KafkaStreams(topology, config);
        streams.start();

        // 关闭应用时优雅关闭Kafka Streams
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

上述代码中,假设消息的键是字符串类型,值是整数类型,通过filter方法过滤出值大于100的消息,并将其写入到output - topic主题。如果实际应用中消息格式不同,需要相应调整Serdes和过滤条件。