面试题答案
一键面试实现思路
- 监控任务队列长度:在工作池运行过程中,定时检查任务队列中的任务数量。
- 弹性伸缩逻辑:
- 当任务队列长度超过100个时,每次增加2个worker。
- 当任务队列长度低于20个时,每次减少2个worker。
- 避免频繁波动:
- 引入一个冷却时间(cooldown time)。例如,在进行一次伸缩操作后,设置一个时间段(如1分钟)内不再进行伸缩操作,以避免因任务队列长度在阈值附近频繁波动导致worker数量频繁变化。
- 可以设置一个缓冲区,比如任务队列长度在80 - 120之间不进行增加操作,在10 - 30之间不进行减少操作,使任务队列长度有一定的波动空间而不触发伸缩。
代码示例(Python示例,假设已有基础的固定worker数量工作池代码)
import time
import threading
from queue import Queue
class Worker(threading.Thread):
def __init__(self, task_queue):
threading.Thread.__init__(self)
self.task_queue = task_queue
def run(self):
while True:
task = self.task_queue.get()
if task is None:
break
task()
self.task_queue.task_done()
class WorkPool:
def __init__(self, initial_workers=5):
self.task_queue = Queue()
self.workers = []
self.cooldown_time = 60 # 冷却时间60秒
self.last_scale_time = 0
self.min_workers = 1
self.max_workers = 10
self.increase_threshold = 100
self.decrease_threshold = 20
self.buffer_increase = 20
self.buffer_decrease = 10
for _ in range(initial_workers):
worker = Worker(self.task_queue)
worker.start()
self.workers.append(worker)
def add_task(self, task):
self.task_queue.put(task)
self.scale_workers()
def scale_workers(self):
current_time = time.time()
if current_time - self.last_scale_time < self.cooldown_time:
return
task_count = self.task_queue.qsize()
if task_count > self.increase_threshold + self.buffer_increase and len(self.workers) < self.max_workers:
new_workers = min(2, self.max_workers - len(self.workers))
for _ in range(new_workers):
worker = Worker(self.task_queue)
worker.start()
self.workers.append(worker)
self.last_scale_time = current_time
elif task_count < self.decrease_threshold - self.buffer_decrease and len(self.workers) > self.min_workers:
new_workers = min(2, len(self.workers) - self.min_workers)
for _ in range(new_workers):
self.task_queue.put(None)
self.workers.pop().join()
self.last_scale_time = current_time
def wait_completion(self):
self.task_queue.join()
for _ in range(len(self.workers)):
self.task_queue.put(None)
for worker in self.workers:
worker.join()
# 示例任务
def example_task():
print("Task is running.")
if __name__ == "__main__":
work_pool = WorkPool()
for _ in range(200):
work_pool.add_task(example_task)
work_pool.wait_completion()
上述代码实现了在已有固定worker数量工作池基础上的弹性伸缩功能,并通过冷却时间和缓冲区避免了worker数量的频繁波动。在其他编程语言中,思路类似,只是具体实现细节会因语言特性而有所不同。