设计思路
- 继承与扩展:继承自
AbstractQueue
类并实现BlockingQueue
接口,以复用部分队列操作的基础实现,并确保满足阻塞队列的特性。
- 任务存储:使用
Deque
(如LinkedList
实现的Deque
)作为底层数据结构来存储任务,因为Deque
提供了高效的两端操作,符合队列的先进先出特性。
- 阈值与事件触发:定义一个阈值变量,当向队列添加任务使得任务数量达到阈值时,触发特定事件。
- 线程安全:使用
ReentrantLock
和Condition
来实现线程安全的阻塞和唤醒操作,保证在多线程环境下的正确性。
关键实现点
- 构造函数:初始化阈值和底层的
Deque
。
offer
方法:尝试将任务添加到队列中,如果队列已满,根据阻塞队列的特性进行阻塞或返回false
。同时检查任务数量是否达到阈值,若达到则触发事件。
take
方法:从队列中取出任务,如果队列为空则阻塞当前线程,直到有任务可取出。
- 事件触发:在
offer
方法中,当任务数量达到阈值时,调用日志记录方法或通知监控系统的方法。
示例代码
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class CustomBlockingQueue<T> implements BlockingQueue<T> {
private final Deque<T> queue;
private final int capacity;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
private static final int THRESHOLD = 10;
public CustomBlockingQueue(int capacity) {
this.queue = new LinkedList<>();
this.capacity = capacity;
}
@Override
public boolean offer(T e) {
lock.lock();
try {
if (queue.size() >= capacity) {
return false;
}
boolean added = queue.offer(e);
if (queue.size() == THRESHOLD) {
triggerEvent();
}
notEmpty.signal();
return added;
} finally {
lock.unlock();
}
}
@Override
public void put(T e) throws InterruptedException {
lock.lockInterruptibly();
try {
while (queue.size() >= capacity) {
notFull.await();
}
queue.add(e);
if (queue.size() == THRESHOLD) {
triggerEvent();
}
notEmpty.signal();
} finally {
lock.unlock();
}
}
@Override
public T take() throws InterruptedException {
lock.lockInterruptibly();
try {
while (queue.isEmpty()) {
notEmpty.await();
}
T removed = queue.remove();
notFull.signal();
return removed;
} finally {
lock.unlock();
}
}
private void triggerEvent() {
// 记录日志或通知监控系统的逻辑
System.out.println("Threshold reached, triggering event...");
}
// 其他未实现的BlockingQueue方法
//...
}
适用的实际业务场景
- 流量控制:在高并发的网络服务中,例如处理HTTP请求的线程池。当请求队列达到一定阈值时,记录日志以便分析流量峰值,或者通知监控系统进行预警,防止系统因过载而崩溃。
- 资源管理:在数据库连接池或文件资源管理的线程池中,当等待获取资源的任务队列达到阈值时,记录日志以优化资源配置,或者通知系统进行资源扩容。
现有标准阻塞队列不足
- 阈值事件触发:标准的
BlockingQueue
(如ArrayBlockingQueue
、LinkedBlockingQueue
)没有直接提供当队列任务数量达到特定阈值时触发事件的功能。虽然可以通过外部计数来实现类似效果,但不够优雅且难以保证线程安全。
- 定制化事件:标准阻塞队列专注于线程安全的任务存储和阻塞/唤醒机制,无法直接满足用户自定义的事件触发需求,如记录特定日志或通知监控系统。