MST

星途 面试题库

面试题:Java ArrayBlockingQueue与线程池的深度集成及定制化

假设现有一个复杂的分布式系统,要求在不同的业务模块使用不同的线程池,且每个线程池中的ArrayBlockingQueue需要根据业务特点实现定制化的拒绝策略和动态调整队列容量的功能。请描述实现该需求的整体架构思路,并给出关键代码示例。
35.7万 热度难度
编程语言Java

知识考点

AI 面试

面试题答案

一键面试

整体架构思路

  1. 业务模块划分:明确系统中的各个业务模块,每个模块有其独特的业务逻辑和资源需求。
  2. 线程池定制:为每个业务模块创建独立的线程池。在创建线程池时,使用ThreadPoolExecutor类,因为它允许我们自定义线程池的核心线程数、最大线程数、存活时间等参数,并且可以设置定制化的BlockingQueue
  3. 定制化拒绝策略:实现RejectedExecutionHandler接口来创建定制化的拒绝策略。根据业务需求,例如记录日志、尝试重新提交任务、直接抛弃任务等不同策略。
  4. 动态调整队列容量:通过在ArrayBlockingQueue外部包装一个类,提供方法来动态修改ArrayBlockingQueue的容量。可以通过反射机制或者重新创建一个ArrayBlockingQueue并替换原有队列的方式来实现。

关键代码示例

  1. 定制化拒绝策略
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 记录日志
        System.out.println("Task " + r.toString() + " rejected from " + executor.toString());
        // 这里可以添加更多业务逻辑,如尝试重新提交任务等
    }
}
  1. 线程池创建及配置
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class BusinessModuleThreadPool {
    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final long KEEP_ALIVE_TIME = 10;
    private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
    private static final int INITIAL_CAPACITY = 20;

    private ThreadPoolExecutor executor;
    private BlockingQueue<Runnable> queue;

    public BusinessModuleThreadPool() {
        queue = new ArrayBlockingQueue<>(INITIAL_CAPACITY);
        executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TIME_UNIT,
                queue,
                new CustomRejectedExecutionHandler());
    }

    public void submitTask(Runnable task) {
        executor.submit(task);
    }

    // 动态调整队列容量方法
    public void adjustQueueCapacity(int newCapacity) {
        BlockingQueue<Runnable> newQueue = new ArrayBlockingQueue<>(newCapacity);
        executor.setQueue(newQueue);
        this.queue = newQueue;
    }
}
  1. 使用示例
public class Main {
    public static void main(String[] args) {
        BusinessModuleThreadPool threadPool = new BusinessModuleThreadPool();
        for (int i = 0; i < 30; i++) {
            int taskNumber = i;
            threadPool.submitTask(() -> {
                System.out.println("Task " + taskNumber + " is running.");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        // 动态调整队列容量
        threadPool.adjustQueueCapacity(30);
    }
}