MST

星途 面试题库

面试题:消息队列之Kafka Streams应用基础

在基于Kafka Streams开发实时流处理应用时,Kafka Streams的拓扑(Topology)是什么?它有什么作用?请简要描述如何构建一个简单的拓扑结构来处理Kafka主题中的数据。
38.0万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

Kafka Streams的拓扑(Topology)定义

Kafka Streams的拓扑是一个有向无环图(DAG),由节点(nodes)和边(edges)组成。节点代表处理数据的操作,比如过滤、映射、聚合等;边则表示数据在不同操作之间的流动路径,即数据从一个操作传递到下一个操作的流向。

Kafka Streams拓扑的作用

  1. 定义处理逻辑:拓扑明确了数据从输入到输出所经历的一系列处理步骤,使得开发人员能够清晰地描述实时流处理的业务逻辑。
  2. 数据流动管理:它控制着数据如何在不同的处理阶段之间流动,确保数据按照预定的流程进行处理,避免数据混乱或丢失。
  3. 分布式执行规划:Kafka Streams基于拓扑结构在多个实例间进行分布式执行规划,以实现高效的并行处理和水平扩展。

构建简单拓扑结构处理Kafka主题数据的步骤

  1. 引入依赖:在项目的构建文件(如Maven的pom.xml或Gradle的build.gradle)中引入Kafka Streams相关依赖。
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>x.x.x</version>
</dependency>
  1. 创建StreamsBuilder
StreamsBuilder builder = new StreamsBuilder();
  1. 定义数据源:从Kafka主题读取数据,创建KStream对象。
KStream<String, String> stream = builder.stream("input-topic");

这里假设输入主题中的数据是键值对形式,类型为String。 4. 定义处理操作:例如,进行简单的映射操作,将输入数据的每个值转换为大写。

KStream<String, String> transformedStream = stream.mapValues(String::toUpperCase);
  1. 定义输出:将处理后的数据写入另一个Kafka主题。
transformedStream.to("output-topic");
  1. 构建拓扑并创建KafkaStreams实例
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);

这里props是Kafka Streams的配置属性,如应用ID、Kafka集群地址等。 7. 启动KafkaStreams

streams.start();

完整示例代码如下:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;

public class SimpleKafkaStreamsApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream("input-topic");
        KStream<String, String> transformedStream = stream.mapValues(String::toUpperCase);
        transformedStream.to("output-topic");

        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

上述代码展示了如何构建一个简单的Kafka Streams拓扑,从input-topic读取数据,将值转换为大写后写入output-topic