若图片有问题,请点击此处查看
本文讲ArrayBlockingQueue
1. 介绍
一个基于数组的有界阻塞队列,FIFO顺序。支持等待消费者和生产者线程的可选公平策略(默认是非公平的)。公平的话通常会降低吞吐量,但是可以减少可变性并避免之前被阻塞的线程饥饿。
1.1 类结构
- ArrayBlockingQueue继承关系
- ArrayBlockingQueue类图
构造器
1 |
|
比较重要的几个参数
1 |
|
保证线程安全的措施
1 |
|
我们可以看到ArrayBlockingQueue使用的是单锁控制线程安全,而LinkedBlockingQueue是双锁控制的, 后者的细粒度更小。
2. 源码剖析
ArrayBlockingQueue也是继承至BlockingQueue(可以去看看上面提到的那篇博客有提到BlockingQueue),它对于不同的方法不能立即满足要求的,作出的回应是不一样的。
我们分别介绍下面的方法的具体实现
- offer(E e)
- offer(E e, long timeout, TimeUnit unit)
- put(E e)
- poll()
- remove(Object o)
2.1 offer(E e) & poll()
插入成功就返回true;若队列满了就直接返回false,不会阻塞自己
1 |
|
上面的代码比较简单,我们来看看入队的具体操作
1 |
|
为了解答上面注释中的问题,我们先看看poll()的实现
1 |
|
1 |
|
结合上面的入队、出队源码,我们来分析一下:
- 单线程下,首先执行
1
2
3
4ArrayBlockingQueue<String> array = new ArrayBlockingQueue<>(3); array.offer("A"); array.offer("B"); array.offer("C");
此时队列的状态
- 再执行
1
2array.poll(); array.offer("D");
最后队列的状态
大家可能会有点疑问,上面的队列不是输出是“D B C”, 咋回事? 肯定不是啦,我们看看类重写的toString就明白了。
1 |
|
思考一下,就会明白了。 通过上面的分析,我们看出了数组就像一个循环数组一样,每个地址都被重复使用。我们也知道了基于数组的队列如何实现的。
offer(E e, long timeout, TimeUnit unit) 与 put(E e)实现都比较简单,大家看看源码即可。
2.2 remove(Object o)
若o存在则移除,返回true;反之。这个操作会改变队列的结构,
但是该方法一般很少使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { if (o.equals(items[i])) { // 主要删除逻辑 removeAt(i); return true; } if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } }
1 |
|
2.3 解释解释Itrs
1 |
|
这个变量可以理解成,在一个线程使用迭代器时,其他的线程可以对队列进行更新操作的一个保障。 源码注释中对Itrs的描述,迭代器和它们的队列之间共享数据,允许在删除元素时修改队列以更新迭代器。 我们可以看到对队列进行了删除操作时,队列都会执行下面的语句
1 |
|
初始化该值是在使用迭代器时
1 |
|
3. 总结
ArrayBlockingQueue的实现整体不难,使用ReetrantLock保证了线程安全,putIndex与takeIndex分别维护入队与出队的位置,一起构成一个循环数组