MST

星途 面试题库

面试题:Go固定worker工作池弹性伸缩策略之高级难度:动态调整worker数量

在刚才实现的固定worker数量的工作池基础上,增加弹性伸缩功能。当任务队列中的任务数量超过一定阈值(如100个)时,自动增加worker数量(每次增加2个);当任务队列中的任务数量低于一定阈值(如20个)时,自动减少worker数量(每次减少2个)。请详细说明实现思路并提供代码示例,同时考虑如何避免worker数量频繁波动。
43.7万 热度难度
编程语言Go

知识考点

AI 面试

面试题答案

一键面试

实现思路

  1. 监控任务队列长度:在工作池运行过程中,定时检查任务队列中的任务数量。
  2. 弹性伸缩逻辑
    • 当任务队列长度超过100个时,每次增加2个worker。
    • 当任务队列长度低于20个时,每次减少2个worker。
  3. 避免频繁波动
    • 引入一个冷却时间(cooldown time)。例如,在进行一次伸缩操作后,设置一个时间段(如1分钟)内不再进行伸缩操作,以避免因任务队列长度在阈值附近频繁波动导致worker数量频繁变化。
    • 可以设置一个缓冲区,比如任务队列长度在80 - 120之间不进行增加操作,在10 - 30之间不进行减少操作,使任务队列长度有一定的波动空间而不触发伸缩。

代码示例(Python示例,假设已有基础的固定worker数量工作池代码)

import time
import threading
from queue import Queue


class Worker(threading.Thread):
    def __init__(self, task_queue):
        threading.Thread.__init__(self)
        self.task_queue = task_queue

    def run(self):
        while True:
            task = self.task_queue.get()
            if task is None:
                break
            task()
            self.task_queue.task_done()


class WorkPool:
    def __init__(self, initial_workers=5):
        self.task_queue = Queue()
        self.workers = []
        self.cooldown_time = 60  # 冷却时间60秒
        self.last_scale_time = 0
        self.min_workers = 1
        self.max_workers = 10
        self.increase_threshold = 100
        self.decrease_threshold = 20
        self.buffer_increase = 20
        self.buffer_decrease = 10
        for _ in range(initial_workers):
            worker = Worker(self.task_queue)
            worker.start()
            self.workers.append(worker)

    def add_task(self, task):
        self.task_queue.put(task)
        self.scale_workers()

    def scale_workers(self):
        current_time = time.time()
        if current_time - self.last_scale_time < self.cooldown_time:
            return
        task_count = self.task_queue.qsize()
        if task_count > self.increase_threshold + self.buffer_increase and len(self.workers) < self.max_workers:
            new_workers = min(2, self.max_workers - len(self.workers))
            for _ in range(new_workers):
                worker = Worker(self.task_queue)
                worker.start()
                self.workers.append(worker)
            self.last_scale_time = current_time
        elif task_count < self.decrease_threshold - self.buffer_decrease and len(self.workers) > self.min_workers:
            new_workers = min(2, len(self.workers) - self.min_workers)
            for _ in range(new_workers):
                self.task_queue.put(None)
                self.workers.pop().join()
            self.last_scale_time = current_time

    def wait_completion(self):
        self.task_queue.join()
        for _ in range(len(self.workers)):
            self.task_queue.put(None)
        for worker in self.workers:
            worker.join()


# 示例任务
def example_task():
    print("Task is running.")


if __name__ == "__main__":
    work_pool = WorkPool()
    for _ in range(200):
        work_pool.add_task(example_task)
    work_pool.wait_completion()

上述代码实现了在已有固定worker数量工作池基础上的弹性伸缩功能,并通过冷却时间和缓冲区避免了worker数量的频繁波动。在其他编程语言中,思路类似,只是具体实现细节会因语言特性而有所不同。