面试题答案
一键面试在Kafka消费者组中,通过以下方式配置消费者以精准消费特定分区的消息:
- 配置
partition.assignment.strategy
参数:- 含义:该参数用于指定分区分配策略。
- 取值:有
RangeAssignor
(范围分配策略,默认策略,按消费者ID排序,依次为每个消费者分配分区)、RoundRobinAssignor
(轮询分配策略,将所有分区和消费者进行排序,然后轮询分配)和StickyAssignor
(黏性分配策略,尽量保持现有分区分配状态,减少分区移动)。如果要精准消费特定分区,需要自定义分配策略。
- 自定义分区分配策略:
- 实现
Partitioner
接口:- 含义:自定义一个类实现
org.apache.kafka.clients.producer.Partitioner
接口,在partition
方法中实现特定的分区逻辑。 - 示例代码:
- 含义:自定义一个类实现
- 实现
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.DefaultRecordAccumulator;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 这里编写自定义逻辑,例如根据key指定分区
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 简单示例:假设根据key为字符串,取其长度对分区数取模
if (keyBytes == null) {
return DefaultRecordAccumulator.randomPartition(topic, numPartitions, Utils.murmur2(valueBytes));
} else {
return Math.abs(keyBytes.hashCode()) % numPartitions;
}
}
@Override
public void close() {
// 关闭时的逻辑
}
@Override
public void configure(Map<String, ?> configs) {
// 配置时的逻辑
}
}
- 在消费者配置中指定自定义分区分配策略:
- 配置参数:
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CustomPartitioner.class.getName());
- 含义:将消费者的分区分配策略设置为自定义的
CustomPartitioner
策略,从而实现精准消费特定分区的消息。
- 配置参数:
此外,在一些场景下,还可以通过assign
方法直接指定消费者消费的分区:
- 使用
assign
方法:- 含义:
KafkaConsumer
类的assign
方法可以直接指定消费者订阅的分区。 - 示例代码:
- 含义:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Properties;
public class SpecificPartitionConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 直接指定消费分区
TopicPartition partition0 = new TopicPartition("test-topic", 0);
consumer.assign(Arrays.asList(partition0));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
records.forEach(record -> {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
});
}
}
}
- **注意**:使用`assign`方法时,消费者将不再参与消费者组的分区分配,而是独立消费指定的分区。并且如果手动指定分区,消费者需要自行管理偏移量,不会依赖消费者组的自动偏移量管理。