面试题答案
一键面试设计思路
- 集中式监控架构:搭建一个集中式的监控中心,各个线程池定期向监控中心汇报自身状态。这样能从全局视角掌握所有线程池状态。
- 心跳机制:线程池以固定时间间隔发送心跳信息给监控中心,表明自身“存活”且正常工作。若监控中心在一定时间内未收到某个线程池的心跳,则判定该线程池可能出现故障。
- 任务跟踪:为每个任务分配唯一标识,在任务进入线程池、开始执行、执行完成等关键节点记录任务状态。通过跟踪任务状态,判断线程池是否正常处理任务,避免任务丢失或重复执行。
- 故障检测与恢复:监控中心监测到线程池故障时,及时通知相关模块采取恢复措施,如重启线程池、转移任务到备用线程池等。
关键实现细节
- 线程池状态汇报:
- 在每个Java线程池中添加状态汇报方法,收集线程池当前活跃线程数、队列任务数、已完成任务数等关键指标。
- 使用定时任务(如ScheduledExecutorService)定期调用汇报方法,将状态数据发送给监控中心。
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(() -> { ThreadPoolStatus status = getThreadPoolStatus(); sendStatusToMonitorCenter(status); }, 0, 10, TimeUnit.SECONDS);
- 心跳机制实现:
- 线程池发送心跳信息可复用状态汇报机制,在汇报数据中添加心跳标识。
- 监控中心维护一个线程池心跳记录表,记录每个线程池最后一次心跳时间。通过定时任务检查记录表,发现长时间未更新心跳的线程池,标记为疑似故障。
// 线程池发送心跳 status.setHeartbeat(true); sendStatusToMonitorCenter(status); // 监控中心检查心跳 ScheduledExecutorService heartbeatChecker = Executors.newScheduledThreadPool(1); heartbeatChecker.scheduleAtFixedRate(() -> { for (String threadPoolId : heartbeatRecords.keySet()) { long lastHeartbeatTime = heartbeatRecords.get(threadPoolId); if (System.currentTimeMillis() - lastHeartbeatTime > HEARTBEAT_TIMEOUT) { markThreadPoolAsFailed(threadPoolId); } } }, 0, 5, TimeUnit.MINUTES);
- 任务跟踪:
- 自定义任务类,包含唯一标识字段。在任务提交到线程池时生成唯一标识。
- 利用AOP(面向切面编程)或在ThreadPoolExecutor的beforeExecute和afterExecute方法中记录任务状态。例如,在beforeExecute方法中标记任务开始执行,在afterExecute方法中标记任务执行完成。
public class TrackableTask implements Runnable { private final String taskId; private final Runnable delegate; public TrackableTask(String taskId, Runnable delegate) { this.taskId = taskId; this.delegate = delegate; } @Override public void run() { delegate.run(); } } ThreadPoolExecutor executor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) { @Override protected void beforeExecute(Thread t, Runnable r) { if (r instanceof TrackableTask) { String taskId = ((TrackableTask) r).getTaskId(); taskStatusTracker.markTaskStarted(taskId); } super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { if (r instanceof TrackableTask) { String taskId = ((TrackableTask) r).getTaskId(); if (t == null) { taskStatusTracker.markTaskCompleted(taskId); } else { taskStatusTracker.markTaskFailed(taskId); } } super.afterExecute(r, t); } };
- 故障检测与恢复:
- 监控中心发现线程池故障后,通过事件机制通知相关模块。
- 恢复模块收到通知后,根据故障类型执行相应恢复操作。如对于线程池资源耗尽故障,可尝试增加线程池资源;对于节点故障,可将任务转移到其他可用节点的线程池。
// 监控中心发现故障通知 eventPublisher.publishEvent(new ThreadPoolFailureEvent(threadPoolId, failureReason)); // 恢复模块监听事件并处理 ApplicationEventMulticaster multicaster = context.getBean(ApplicationEventMulticaster.class); multicaster.addApplicationListener(new ApplicationListener<ThreadPoolFailureEvent>() { @Override public void onApplicationEvent(ThreadPoolFailureEvent event) { String threadPoolId = event.getThreadPoolId(); String failureReason = event.getFailureReason(); ThreadPoolRecoveryStrategy strategy = recoveryStrategyFactory.getStrategy(failureReason); strategy.recover(threadPoolId); } });