0%

ConcurrentLinkedQueue源码解析

ConcurrentLinkedQueue是使用非阻塞的方式来实现线程安全队列,采用CAS(Compare and Swap)无锁算法。非阻塞同步算法与CAS(Compare and Swap)无锁算法 - Mainz

1 结构

ConcurrentLinkedQueue由head节点和tair节点组成,每个节点(Node)由节点元素(item)和指向下一个节点的引用(next)组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tair节点等于head节点。
CAS(乐观锁算法)的基本假设前提
CAS比较与交换的伪代码可以表示为:

1
2
3
4
do{   
备份旧数据;
基于旧数据构造新数据;
}while(!CAS( 内存地址,备份的旧数据,新数据 ))

2.入队

入队结构图
第一步添加元素1。队列更新head节点的next节点为元素1节点。又因为tail节点默认情况下等于head节点,所以它们的next节点都指向元素1节点。
第二步添加元素2。队列首先设置元素1节点的next节点为元素2节点,然后更新tail节点指向元素2节点。
第三步添加元素3,设置tail节点的next节点为元素3节点。
第四步添加元素4,设置元素3的next节点为元素4节点,然后将tail节点指向元素4节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
Node<E> n = new Node<E>(e);
retry:
for (;;) {
Node<E> t = tail;
Node<E> p = t;
for (int hops = 0; ; hops++) {
Node<E> next = succ(p);
if (next != null) {
if (hops > HOPS && t != tail)
continue retry;
p = next;
} else if (p.casNext(null, n)) {//如果p是尾节点,则设置p节点的next节点为入队节点。
if (hops >= HOPS)
casTail(t, n); // Failure is OK.
return true;
} else {
p = succ(p);
}
}
}
}

(1)创建临时变量保存tail节点。
(2)取出tail的next节点。
(3)如果next为null,表示p是尾节点,则将插入节点放入p.next。
(4)更新tail的值。允许失败
(5)如果插入不成功则再次判断p是否存在next。

就是指当两者进行比较时,如果相等,则证明共享数据没有被修改,替换成新值,然后继续往下运行;如果不相等,说明共享数据已经被修改,放弃已经所做的操作,然后重新执行刚才的操作。容易看出 CAS 操作是基于共享数据不会被修改的假设,采用了类似于数据库的 commit-retry 的模式。当同步冲突出现的机会很少时,这种假设能带来较大的性能提升。

3.出队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public E poll() {
Node<E> h = head;
Node<E> p = h;
for (int hops = 0; ; hops++) {
E item = p.getItem();

if (item != null && p.casItem(item, null)) {
if (hops >= HOPS) {
Node<E> q = p.getNext();
updateHead(h, (q != null) ? q : p);
}
return item;
}
Node<E> next = succ(p);
if (next == null) {
updateHead(h, p);
break;
}
p = next;
}
return null;
}

首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。