MST

星途 面试题库

面试题:Python多线程TCP服务器的复杂场景应对

假设在一个Python多线程TCP服务器中,需要实现一种机制:当某个客户端连续发送恶意请求(例如,超过一定频率的无效请求)时,服务器能够自动识别并暂时封禁该客户端的IP,一段时间后解封。请详细阐述实现这个机制的整体思路,并且给出关键部分的Python代码实现,同时分析可能遇到的并发控制和性能问题以及解决方案。
42.1万 热度难度
编程语言Python

知识考点

AI 面试

面试题答案

一键面试

整体思路

  1. 请求频率计数:使用一个字典来记录每个客户端IP的请求次数和最后请求时间。每次收到请求时,更新对应IP的计数和时间。
  2. 恶意请求判断:设定一个时间窗口和最大请求次数阈值。在每次收到请求时,检查在当前时间窗口内该IP的请求次数是否超过阈值。如果超过,则判定为恶意请求。
  3. IP封禁:如果判定为恶意请求,将该IP添加到封禁列表中,并记录封禁时间。
  4. 解封:启动一个后台线程,定期检查封禁列表,对已达到封禁时间的IP进行解封。

关键Python代码实现

import threading
import time
from collections import defaultdict

# 存储每个IP的请求计数和最后请求时间
request_count = defaultdict(lambda: [0, 0])
# 封禁列表
banned_ips = set()
# 时间窗口(秒)
WINDOW = 60
# 最大请求次数阈值
THRESHOLD = 10
# 封禁时间(秒)
BAN_TIME = 300

def check_request(ip):
    current_time = time.time()
    count, last_time = request_count[ip]
    if current_time - last_time <= WINDOW:
        request_count[ip][0] += 1
        if request_count[ip][0] > THRESHOLD:
            ban_ip(ip)
    else:
        request_count[ip] = [1, current_time]

def ban_ip(ip):
    if ip not in banned_ips:
        banned_ips.add(ip)
        threading.Thread(target=unban_ip, args=(ip,)).start()

def unban_ip(ip):
    time.sleep(BAN_TIME)
    if ip in banned_ips:
        banned_ips.remove(ip)

# 模拟处理客户端请求
def handle_client_request(client_ip):
    if client_ip in banned_ips:
        return
    check_request(client_ip)
    # 正常处理请求的代码
    print(f"Handling request from {client_ip}")

并发控制和性能问题及解决方案

  1. 并发控制问题
    • 共享数据竞争request_countbanned_ips 是共享数据,多个线程可能同时访问和修改,导致数据不一致。
    • 解决方案:使用 threading.Lock 来保护共享数据。例如:
lock = threading.Lock()

def check_request(ip):
    with lock:
        current_time = time.time()
        count, last_time = request_count[ip]
        if current_time - last_time <= WINDOW:
            request_count[ip][0] += 1
            if request_count[ip][0] > THRESHOLD:
                ban_ip(ip)
        else:
            request_count[ip] = [1, current_time]


def ban_ip(ip):
    with lock:
        if ip not in banned_ips:
            banned_ips.add(ip)
            threading.Thread(target=unban_ip, args=(ip,)).start()


def unban_ip(ip):
    time.sleep(BAN_TIME)
    with lock:
        if ip in banned_ips:
            banned_ips.remove(ip)
  1. 性能问题
    • 频繁的锁操作:过多的锁操作会导致线程等待,降低性能。
    • 解决方案:尽量减少锁的粒度,例如在更新 request_count 时,只在读取和更新操作时加锁,而不是整个函数都加锁。
    • 定时器开销:使用 time.sleep 来实现解封逻辑,可能会导致线程长时间占用资源。
    • 解决方案:可以使用 threading.Timer 类,它在后台线程中运行,并且在指定时间后执行解封操作,而不会阻塞主线程。例如:
def ban_ip(ip):
    with lock:
        if ip not in banned_ips:
            banned_ips.add(ip)
            threading.Timer(BAN_TIME, unban_ip, args=(ip,)).start()


def unban_ip(ip):
    with lock:
        if ip in banned_ips:
            banned_ips.remove(ip)