MST

星途 面试题库

面试题:消息队列运维自动化脚本之复杂监控脚本

在Kafka消息队列环境下,需要编写一个Python脚本,实现对Kafka集群的全面监控,包括每个Broker的负载情况、Topic的消息积压量、分区的分布等。当某个指标超过设定阈值时,通过邮件发送告警信息。请阐述脚本的整体设计思路,并给出关键代码片段。
37.4万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

整体设计思路

  1. 连接Kafka集群:使用Python的Kafka客户端库(如kafka-python)连接到Kafka集群,获取集群元数据,包括Broker列表、Topic列表及分区信息。
  2. 监控指标获取
    • Broker负载:通过Kafka的admin_client获取每个Broker的请求速率、字节速率等指标来衡量负载情况。
    • Topic消息积压量:获取每个Topic的分区,计算每个分区的log_end_offsetconsumer_offset差值得到积压量。
    • 分区分布:从集群元数据中获取每个Topic的分区分布在哪些Broker上。
  3. 阈值设定与告警:设定每个监控指标的阈值,当指标超过阈值时,使用Python的smtplib库发送邮件告警。

关键代码片段

from kafka import KafkaAdminClient, KafkaConsumer
from kafka.admin import ConfigResource, ConfigResourceType
import smtplib
from email.mime.text import MIMEText

# 连接Kafka集群
admin_client = KafkaAdminClient(
    bootstrap_servers=['your_kafka_broker1:9092', 'your_kafka_broker2:9092'],
    client_id='monitoring_client'
)

# 获取Broker负载情况
broker_loads = {}
for broker in admin_client.describe_cluster().brokers:
    # 这里假设可以通过某些API获取负载指标,实际可能需要深入挖掘JMX指标等
    broker_loads[broker.node_id] = 0  # 初始化负载为0,后续填充实际负载值

# 获取Topic消息积压量
consumer = KafkaConsumer(bootstrap_servers=['your_kafka_broker1:9092', 'your_kafka_broker2:9092'])
topic_backlogs = {}
for topic in consumer.topics():
    partitions = consumer.partitions_for_topic(topic)
    total_backlog = 0
    for partition in partitions:
        end_offset = consumer.end_offsets([(topic, partition)])[(topic, partition)]
        consumer.poll()
        consumer.seek((topic, partition), 0)
        start_offset = consumer.position((topic, partition))
        total_backlog += end_offset - start_offset
    topic_backlogs[topic] = total_backlog

# 获取分区分布
topic_partition_distribution = {}
cluster_metadata = admin_client.describe_cluster()
for topic in cluster_metadata.topics:
    topic_partition_distribution[topic] = {}
    for partition in topic.partitions:
        topic_partition_distribution[topic][partition.index] = partition.leader

# 设定阈值并告警
broker_load_threshold = 80
topic_backlog_threshold = 1000
for broker_id, load in broker_loads.items():
    if load > broker_load_threshold:
        msg = MIMEText(f'Broker {broker_id} load {load} exceeds threshold {broker_load_threshold}')
        msg['Subject'] = 'Kafka Broker Load Alert'
        msg['From'] ='sender_email@example.com'
        msg['To'] ='recipient_email@example.com'

        with smtplib.SMTP('smtp.example.com', 587) as server:
            server.starttls()
            server.login('sender_email@example.com', 'password')
            server.sendmail('sender_email@example.com','recipient_email@example.com', msg.as_string())

for topic, backlog in topic_backlogs.items():
    if backlog > topic_backlog_threshold:
        msg = MIMEText(f'Topic {topic} backlog {backlog} exceeds threshold {topic_backlog_threshold}')
        msg['Subject'] = 'Kafka Topic Backlog Alert'
        msg['From'] ='sender_email@example.com'
        msg['To'] ='recipient_email@example.com'

        with smtplib.SMTP('smtp.example.com', 587) as server:
            server.starttls()
            server.login('sender_email@example.com', 'password')
            server.sendmail('sender_email@example.com','recipient_email@example.com', msg.as_string())