若图片有问题,请点击此处查看
本文开始介绍并发队列,为后面介绍线程池打下基础。并发队列莫非也是出队、入队操作,还有一个比较重要的点就是如何保证其线程安全性,有些并发队列保证线程安全是通过lock,有些是通过CAS。 我们从ConcurrentLinkedQueue开始吧。
1. 介绍
ConcurrentLinkedQueue是集合框架的一员,是一个无界限且线程安全,基于单向链表的队列。该队列的顺序是FIFO。当多线程访问公共集合时,使用这个类是一个不错的选择。不允许null元素。是一个非阻塞的队列。
它的迭代器是弱一致性的,不会抛出java.util.ConcurrentModificationException,也可能在迭代期间,其他操作也正在进行。size()方法,不能保证是正确的,因为在迭代时,其他线程也可以操作该队列。
1.1 类图
(显示的方法都是公有方法)
1 |
|
继承至AbstractQueue,他提供了队列操作的一个框架,有基本的方法,add、remove,element等等,这些方法基于offer,poll,peek(最主要看这几个方法)。
2. 源码分析
2.1 类的整体结构
队列中的元素Node
1 |
|
构造器1:
1 |
|
构造器2:
1 |
|
下面开始讲方法,从offer,poll,peek从这几个方法入手
2.2 offer
添加元素到队尾。因为队列是无界的,这个方法永远不会返回false
分为三种情况进行分析(一定自己跟着代码debug,一步步的走)
- 单线程时(使用IDEA debug一直进入的是 **else if把我搞迷茫了,我会写一个博客来解释原因**)
1 |
|
以上面的代码,分析每一个步骤。 执行构造函数后:
此时链表的head与tail指向哨兵节点
插入”A”, 此时没有设置tail(‘两跳机制’,这里的原因后面详见)
插入”B”,
单线程情况比较简单
- 多线程offer时
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { // p is last node // 只有一个线程能够CAS成功,其余的都重试 if (p.casNext(null, newNode)) { // 延迟设置tail,第一个node入队不会设置tail,第二个node入队才会设置tail //以此类推, '两跳机制' if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } // 这里是有其他线程正在poll操作才会进入,此时只考虑多线程offer的情况,暂不分析 else if (p == q) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. // 存在tail被更改前,和更改后的两种情况 p = (p != t && t != (t = tail)) ? t : q; } }
结合上面的代码,看图
- 步骤一,线程A、线程B都执行到
1
if (p.casNext(null, newNode))
- 步骤二,只有一个线程执行成功,假设线程A成功,线程B失败
因为p(a) == t(a), 此时不执行casTail,tail不变。q = p.next, 所以此时q(b) = Node2 ,那么 p(b) != q(b), 线程B执行p = (p != t && t != (t = tail)) ? t : q;
线程B即将执行
1 |
|
- 步骤三 此时线程C进入。
此时,p(c) != q(c), 线程C执行
1
1
p = (p != t && t != (t = tail)) ? t : q;
执行完后,q(c)赋值给p(c). 再次循环,此时,q(c) == null, 设置p(c)的next,线程C将值入队
- 步骤四 p(c) != t(c), 线程C执行casTail(t, newNode), 线程C设置尾结点
- 此时线程B执行
1
1
p = (p != t && t != (t = tail)) ? t : q;
因为p(b) == t(b),所以 q(b) 赋值给 p(b)。继续循环,最后得到
- 多线程的另一种情况,回到步骤三,此时线程C把值入队了,但是还没有设置tail
- 线程B,将值入队成功 在步骤三的基础上,线程B入队成功后,目前的状况如下:
此时,线程C执行casTail(t, newNode),但是现在的tail != t(c), CAS失败, 直接返回。
2.2.1 小结
上面不管是多线程还是单线程,都是努力的去寻找next为null的节点,若为next节点为null,再判断是否满足设置tail的条件。
多线程offer的第一种情况存在设置tail滞后的问题,我把它称之为“两跳机制”,后面讲使用这种机制的原因。 我们看到上面的情况一直没有进入else if (p == q)分支,进入else if分支只会发生在有其他线程在poll时,我们先讲讲poll,再讲讲何时进入else if分支。
2.3 poll
删除并返回头结点的值
简单提一下单线程与多线程的poll,着重分析一下poll与offer共存的情况
- 单线程时
单线程比较简单,就不画图了,按照上面的queue,进行一步一步的debug就行了
- 多线程,只有poll时
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; // casItem这里只有一个线程能够成功,其余的继续下面的代码 if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } }
1 |
|
从上面代码可以看出,修改item与head都会使用CAS,这些变量都是被volatile修饰,所以保证了这些变量的线程安全性。不管是单线程还是多线程的poll,它们都是去寻找一个有效的头节点,删除并返回该值,若不是有效的就继续找,若队列为空了,就返回null。
最后分析一下,offer与poll共存的情况
- 线程A做offer操作,线程B做poll操作,初始的状态如下:
- 线程A进入。
- 线程A将要执行
1
1
Node<E> q = p.next;
线程B进入,进行poll操作 此时,线程B执行了一次内循环,将q(b)赋值给了p(b);
- 线程B再次执行内循环,此时将p(b).item置空,将p(b)赋值给head,之前的h(b)的next指向自己,线程B退出
- 线程A执行
1
1
Node<E> q = p.next;
此时,p(a).next 指向自己(等待被GC), 进入else if (p == q)分支,线程A退出,经过一番执行后,最后得到的状态,如下:
进入else if (p == q)分支的情况,只会发生在poll与offer共存的情况下。
2.4 peek
获取首个有效的节点,并返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public E peek() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null || (q = p.next) == null) { updateHead(h, p); return item; } else if (p == q) continue restartFromHead; else p = q; } } }
peek与poll的操作类似,这里就贴一下代码就是了。
3. 总结
ConcurrentLinkedQueue是使用非阻塞的方式保证线程的安全性,在设置关系到整个Queue结构的变量时(这些变量都被volatile修饰),都使用CAS的方式对它们进行赋值。
- size方法是线程不安全的,返回的结果可能不准确
关于“两跳机制”(自己取得名字),
Both head and tail are permitted to lag. In fact, failing to update them every time one could is a significant optimization (fewer CASes). As with LinkedTransferQueue (see the internal documentation for that class), we use a slack threshold of two; that is, we update head/tail when the current pointer appears to be two or more steps away from the first/last node.
Since head and tail are updated concurrently and independently, it is possible for tail to lag behind head (why not)? – ConcurrentLinkedQueue
大致意思,head与tail允许被延迟设置。不是每次更新它们是一个重大的优化,这样做就可以更少的CAS(这样在很多线程使用时,积少成多,效率更高)。它的延迟阈值是2,设置head/tail时,当前的结点离first/last有两步或更多的距离。 这就是“两跳机制”
我们想不通的地方,可能是这个类或方法的一个优化的地方。向着大佬看齐~
4. 引用
Java多线程 39 - ConcurrentLinkedQueue详解,讲的非常好,上面的思路是跟着他来的