面试题答案
一键面试-
主要步骤:
- 创建
StreamsBuilder
对象,用于构建流处理拓扑。 - 使用
streamsBuilder.stream()
方法从Kafka主题读取流数据。 - 对读取的流数据应用过滤操作,过滤出满足条件的消息。
- 将过滤后的流数据写入到另一个Kafka主题。
- 创建
-
代码示例(以Java为例):
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class KafkaStreamsFilterExample {
public static void main(String[] args) {
// 配置Kafka Streams
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "filter-example-app");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
// 创建StreamsBuilder
StreamsBuilder streamsBuilder = new StreamsBuilder();
// 从Kafka主题读取流数据
KStream<String, Integer> inputStream = streamsBuilder.stream("input-topic");
// 过滤操作,假设消息值大于100
KStream<String, Integer> filteredStream = inputStream.filter((key, value) -> value > 100);
// 将过滤后的流数据写入到另一个Kafka主题
filteredStream.to("output-topic");
// 构建拓扑
Topology topology = streamsBuilder.build();
// 创建并启动Kafka Streams实例
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();
// 关闭应用时优雅关闭Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
上述代码中,假设消息的键是字符串类型,值是整数类型,通过filter
方法过滤出值大于100的消息,并将其写入到output - topic
主题。如果实际应用中消息格式不同,需要相应调整Serdes
和过滤条件。