- 基于链表结构:
ConcurrentLinkedQueue
是一个基于链表的无界队列。链表节点 Node
结构如下:
private static class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
// 省略其他代码
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
} catch (Exception e) {
throw new Error(e);
}
}
}
- 使用 volatile 关键字:
Node
类中的 item
和 next
字段都被声明为 volatile
。这确保了对这些字段的读写操作具有可见性,即一个线程对 item
或 next
的修改,其他线程能够立即看到。例如,当一个线程修改了 next
引用,指向新的节点,其他线程能够感知到这个变化。
- 入队操作(offer 方法):
offer
方法使用 CAS
(Compare - And - Swap)操作来确保线程安全。
- 首先获取尾节点
t
,然后尝试通过 CAS
将尾节点的 next
字段设置为新节点。如果设置成功,说明入队操作成功。
- 如果
CAS
操作失败,可能是因为其他线程同时修改了尾节点,此时需要重新获取尾节点并再次尝试 CAS
操作,直到成功。
- 示例代码如下:
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
if (p.casNext(null, newNode)) {
if (p != t)
casTail(t, newNode);
return true;
}
}
else if (p == q)
p = (t != (t = tail))? t : head;
else
p = q;
}
}
- 出队操作(poll 方法):
poll
方法同样使用 CAS
操作来实现线程安全。
- 首先获取头节点
h
,然后尝试通过 CAS
将头节点的 item
字段设置为 null
,并将头节点的 next
字段设置为新的头节点。如果设置成功,说明出队操作成功,返回原来头节点的 item
。
- 如果
CAS
操作失败,可能是因为其他线程同时进行了出队操作,此时需要重新获取头节点并再次尝试 CAS
操作,直到成功。
- 示例代码如下:
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
if (p != h)
updateHead(h, ((q = p.next) != null)? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
- 总结:
ConcurrentLinkedQueue
通过 volatile
保证字段的可见性,在入队和出队操作中使用 CAS
操作来原子性地更新节点的 item
和 next
字段,从而有效地处理并发情况,保证了线程安全。