面试官:LinkedBlockingDeque和SynchronousQueue工作原理一样吗?
========
面试官:LinkedBlockingDeque和SynchronousQueue工作原理一样吗?
求职者:对不起,队列太多,我记不得了……
经过之前的学习,我们发现队列真的是一种非常高效的数据结构。而世界上很多的问题,都可以被抽象成入队、出队的操作——更加准确的说:生产者、消费者问题应用场景非常的多。从实践的角度,我们不会经常使用同步器,而是使用Java语言在同步器基础之上再封装的结构,其中阻塞队列(BlockingQueue)就是我们平时最常用的一类数据结构。构造业务逻辑,处理批量任务,处理线程池,我们都需要掌握阻塞队列。
这节课,我会为大家介绍7种继承于BlockingQueue的阻塞队列,以及他们的应用场景。
- ArrayBlockingQueue
- LinkedBlockingQueue
- LinkedBlockingDeque
- PriorityBlockingQueue
- DelayQueue
- LinkedTransferQueue
- SynchronousQueue
打破传统的学习方式,我不会按照1-7的顺序,如念经般给你逐一介绍;而是,我会从问题入手,从不同的场景切入,带你思考不同的方案。
本节关联的面试题目:
- ArrayBlockingQueue和LinkedBlockingQueue的区别?
- add/offer/put的区别?remove/poll/take的区别?
- DelayQueue的应用场景是什么?
- SynchronousQueue的应用场景是什么?
- LinkedTransferQueue和SynchronousQueue有什么异同?
- LinkedBlockingDeque和SynchronousQueue工作原理一样吗?
生产者消费者问题
在并发编程中,生产者、消费者问题,是一类非常重要的问题;也是BlockingQueue的一大应用场景。生产者消费者问题由著名的计算机科学家Dijkstra提出,是一个经典的多处理问题(multi-processing)。生产者产生数据,消费者消费数据。 中间用一个有界限的缓冲区连接。
缓冲区通常是有界限的(有大小限制),因此也称称作:有限缓冲区问题(bounded-buffer problem)。但是随着时代发展,缓冲区开始支持自动扩容,尤其是链表这种跳跃数据结构,允许节点在内存分配上不连续,扩容的成本非常低。因此我们也会使用无界缓冲区(unbounded buffer)。
睡觉的理发师问题
下图是睡觉的理发师,是一种经典的有界缓冲区生产者消费者问题:
顾客进入理发店后,现在等待坐席上等待。睡觉的理发师发现有顾客来到,被唤醒,优先处理等待时间最长的顾客。理完发的顾客离开理发店。如果等待坐席满了,新顾客就不会再进来。在这个故事中:
- 理发师是消费者(Consumer)
- 顾客不断到来这个显现是生产者(Producer)
- 没顾客的时候理发师休眠(消费者在等待队列空时休眠)
- 坐满了顾客不再进入(生产者在等待队列满的情况下休眠)
思考:程序怎么写?
下面程序(伪代码),用itemCount
代表缓冲区中元素的个数。生产过程中生产者用produceItem
生产数据,然后用putItemIntoBuffer
将元素放入缓冲区。消费者用remoteItemFromBuffer
取出缓冲区的元素,然后减少itemCount
。
如果缓冲区满了itemCount == BUFFER_SIZE
,生产者休眠。如果缓冲区空了itemCount == 0
消费者休眠。当开始生产时,itemCount=1
,唤醒消费者。 当开始消费时,itemCount == BUFFER_SIZE_1
时,唤醒生产者。
int itemCount = 0;
void producer()
{
while (true)
{
item = produceItem();
if (itemCount == BUFFER_SIZE)
sleep();
putItemIntoBuffer(item);
itemCount = itemCount + 1;
if (itemCount == 1)
wakeup(consumer);
}
}
void consumer()
{
while (true)
{
if (itemCount == 0)
sleep();
item = removeItemFromBuffer();
itemCount = itemCount - 1;
if (itemCount == BUFFER_SIZE - 1)
wakeup(producer);
consumeItem(item);
}
}
这个程序有个致命的缺陷,这个程序没有考虑到竞争条件——会导致死锁。死锁就是多个线程互相等待,这里就是生产者等待消费者唤醒,消费者等待生产者唤醒。
假设缓冲区大小是100, 当itemCount==100
时,生产者本来要休眠。但是如果sleep
执行前触发了线程切换,消费者开始执行。 消费者从缓冲区取出元素,然后尝试唤醒生产者。生产者线程收到唤醒信号,醒来,然后执行下一句sleep
进入休眠。消费者继续执行,把缓冲区全部清空,也陷入休眠。这样,就构成了死锁。
为了解决死锁问题,可以考虑用synchronized
关键字或者锁,对producer
和consumer
方法上锁,但是这样锁的范围太大。
另一种,就是对所有读写操作itemCount
的位置分别上锁,但是这样程序太复杂。比如下面这样,需要处理的个例太多。
synchronized(...) {
if (itemCount == 0)
sleep();
}
因此,通常我们考虑用Semaphore来解决这个问题。Semaphore提供了一对原子操作up
down
。生产过程中将空闲数(emptyCount)减1,然后将填充数(fillCount)加1。消费过程将fillCount减1,将emptyCount加1。如果缓冲区满了,emptyCount=1
,这个时候down操作会导致生产者休眠。如果缓冲区空了,fillCount=0
,down操作会导致消费者休眠。
int fillCount = 0; // items produced
int emptyCount = BUFFER_SIZE; // remaining space
void producer()
{
while (true)
{
item = produceItem();
down(emptyCount);
putItemIntoBuffer(item);
up(fillCount);
}
}
void consumer()
{
while (true)
{
down(fillCount);
item = removeItemFromBuffer();
up(emptyCount);
consumeItem(item);
}
}
上面操作up
和down
操作都是基于CAS的原子操作,up
和down
之间允许多个线程同时进入。只要fillCount
或者emptyCount
不为0。removeItemFromBuffer
和putItemIntoBuffer
可能需要单独上锁,保证互斥,具体背后操作缓冲区逻辑我这里没有给出。
上面的up和down方法是操作系统提供semaphore的习惯。Java里这两个操作叫做acquire和release。
最后还有一种方法是利用条件变量——Condition。具体的程序在4.4上实现ProducerCustomerModel
时我们已经给出,感兴趣的朋友可以参考。
思考:如何更优雅?上面程序从面向对象角度看还有没有更好的设计?
上面的程序在使用队列时,需要考虑很多同步细节(这里用了Semaphore)。因为使用生产者消费者是一个高频场景,因此可以考虑封装。将同步能力封装进入队列中,这就是Java提供的BlockingQueue Interface,以及它的7种实现。
生产者、消费者模型的用途
在多线程环境中,生产者消费者模型,往往用队列(Queue)实现。队列的用途有很多,比如说web服务,比如说消息队列。
如图,web服务的模型非常类似睡觉的理发师。 只不过从单生产者、单消费者模型,变成了多生产者、多消费者模型。因此队列的**入队(enqueue)和出队(dequeue)**操作需要考虑竞争条件。复习一下,对入队、出队上锁是一种最常见的同步方法,也就是互斥(Mutual Exclusion)——对临界区上锁。比上锁并发度更高的做法,是用少数几个CAS操作,比如上节课我们学习的CHL链表。CAS+自旋判断的逻辑,如果能够在少数几个自旋判断中获得临界区的控制权限,那么这个过程将会比锁速度快。
另一个常见的用途是消息队列(Message Queue)。在单播(Unicast)场景,就是1对1通信,这个时候是睡觉的理发师模型。但是在多播(Multicat)场景,模型发生了一定的变化。
如果,因为消息是给每个接收者的,因此每个接收者并不是从队列中直接取出元素进行消费。而是每个接收者需要有一套对队列操作的完整指针(记录了当前读取消息的位置),这套指针最常见的做法就是ThreadLocal技术。向队列中写入数据的时候,用的是一套指针。如果存在多个生产者,那么多个生产者竞争这套指针写入数据。但是当读取数据的时候,为每个接收者通过ThreadLocal创建一套单独的指针,这样能够从头开始消费队列中的数据,就好像队列被复制了很多份一样。
Java Blocking Queue
Java的BlockingQueue Interface下面的7种实现:
- ArrayBlockingQueue
- LinkedBlockingQueue
- LinkedBlockingDeque
- PriorityBlockingQueue
- DelayQueue
- LinkedTransferQueue
- SynchronousQueue
既然是队列,就需要支持先入先出(FIFO)的入队、出队操作。BlockingQueue Interface提供3类入队、出队操作:
- 抛异常(add/remove)
- 如果队列满了继续add会抛异常,如果队列空了继续remove会抛异常
- 非阻塞(offer/poll)
- 如果队列满了继续offer会返回null表示失败;如果队列空了继续poll会返回null表示失败
- 阻塞(put/take)
- 队列满了put会阻塞;队列空了take会阻塞
下面这段程序我们来一起尝试用一下这7个队列:
public class BlockingQueueTest {
public static void main(String[] argv) {
BlockingQueue<Integer> queue;
//queue = new ArrayBlockingQueue<Integer>(10);
//queue = new LinkedBlockingQueue<Integer>();
//queue = new LinkedBlockingDeque<>();
//queue = new PriorityBlockingQueue<>();
//queue = new LinkedTransferQueue<Integer>();
queue = new SynchronousQueue<>();
//queue = new DelayQueue<>();
// Producer
for(int i = 0; i < 100; i++) {
new Thread(() -> {
//queue.offer((int) Math.floor(Math.random()*1000));
try {
queue.put((int) Math.floor(Math.random()*1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// Consumer
for(int i = 0; i < 10; i++) {
new Thread(() -> {
while(true) {
//Integer x = queue.poll();
Integer x = null;
try {
x = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.format("Thread %d take %d\n", Thread.currentThread().getId(), x);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
除了DelayQueue不能直接使用外,其他的队列功能大同小异。可以通过offer/put添加元素,用poll/take取出元素。多数都遵循FIFO的关系,除了PriorityQueue和DelayQueue。PriorityQueue会每次取出队列中最小的元素。
自己打开要给IDE试跑下上面的程序。
然后可以把put
和take
替换程offer
和poll
。再观察下有没有什么变化?有一个比较大的区别是SynchronousQueue
出现了很多拿到null
的情况。这个现象可以等后面我们学习了SynchronousQueue的实现原理再回来看。
这些问题是我们从 API层面看到的,请大家带着这些问题学习下去。我们下面会结合场景给大家分析这些问题。
线程安全问题
BlockingQueue除了要解决形形色色的场景——发红包、发短信、web服务、RPC调用、线程池……有一个非常重要的目标就是线程安全(Thread Safe)。通常我们写程序是在单线程模型下思考的,在多线程环境下可能会发生竞争条件、饥饿等等各种各样的问题。而线程安全的类,能够在多线程下让用户(程序员)可以像单线程那样思考。比如说queue.take()
,如果是queue
是BlockingQueue,我们可以放心的在多线程环境下使用,不用加锁,不同考虑竞争条件。这样,解决线程安全问题的责任,就落到了BlockingQueue的设计者,而不是BlockingQueue的用户。
Java中java.util.concurrent
中的数据结构多数的线程安全的。但是平时我们用的集合比如说ArrayList,HashMap都不是线程安全的。另外,ThreadLocal是线程安全,比如用ThreadLocal来定义一个static
变量,自然是线程安全的,不过多线程不会共享这个变量,而是每个线程一份。最后,自己可以通过上锁,利用同步器,还有后面讲到的无锁编程,自己实现线程安全的数据结构。
应用场景举例:Web服务器
web服务是一个多生产者+多消费者模型,队列满了会拒绝服务(Deny Of Service),著名的DDOS攻击,就是针对网络通信的缓冲区。
这个场景中,经常我们会用到***BlockingQueue的结构。比如ArrayBlockingQueue,ListedBlockingQueue。List实现的队列扩容更容易,List本身是一个跳跃结构。Array版本的队列只支持有界(bounded)的队列,一开始就需要设置好容量。List的版本支持有界(一开始设置容量),也支持无界。从数据结构本身分析,Array版本的Queue扩容成本更高。Array的扩容可能会触发把Array从内存中一个地方拷贝到另一个地方。当然,Array版本也有优势,Array版本索引成本更低。获取第k个元素在Array中只需要O(1)的操作,而在链表中却需要O(n)的操作。
值得一提的是,List版本的Queue可以同时支持有界(bounded)和无界(unbounded)两种模式。通常建议,如果没有需要直接索引元素的需求,可以考虑用List版本性能更好。
undo问题
很多软件需要实现撤销(undo)能力。浏览器的回退(back)操作,也可以看做是一种undo能力。
假设一个文本编辑器把所有用户的行为(Action)存入一个队列。那么每次用户发生某个行为,就需要入队。队列通常设计成有界的,比如最多只帮用户存10000个操作历史。上图中新操作从右边入队得到记录,超出足够数目(10000)后,操作从左边删除。那么,当我们需要实现undo功能的时候,实际上要从右边删除操作。undo是undo最近的操作,这个时候就需要Deque。
如果上述的操作发生在一个并发环境(多个线程),那么就可以考虑用阻塞队列版本的双向队列,也就是——LinkedBlockingQueue。
双向队列本身没有方向,因此相比虽然poll/take/put/offer可以用,但是通常我们会用下面这几个API:
- putLast/putFirst
- takeLast/takeFirst
- offerLast/offerFirst
- pollLast/pollFirst
First代表一端,Last代表另一端。
优先级调度算法
在操作系统、离线任务管理框架、线程池等等领域,都需要调度。按照任务的优先级(通常是一个整数)来调度任务,优先级高的先执行,优先级低的等待高优任务执行完后执行,是非常常见的一种调度手段。
元素无序进入优先级队列,然后可以按照优先级有序的取出。这里会用到一种叫做Heap的数据结构。
如图:假设我们的算法规定,优先级数字越小优先级越高。那么我们可以在内存中形成一个树状结构,父级元素总是比子级元素优先级高。这个结构虽然是树状结构,但是可以用数组表示,就是堆(Heap)。每个新元素插入Heap,都需要一定的操作维护Heap的性质——父级元素比自己元素值小。优先级队列的出队操作,每次都从Heap的顶部拿走一个元素,然后Heap自己会重新恢复到Heap的状态。
以下是删除最小元素的演示。
以下是插入新元素过程的演示。
Java为了支持多种需要将元素按照优先级提取,提供了优先级队列PriorityQueue。而PriorityBlockingQueue可以看做是一个线程安全的PriorityQueue。
线程安全,就是在多线程环境可以安全使用,不用考虑临界区保护等等。
延迟队列
在高并发场景下,我们经常需要控制流速。当一个批处理任务的引擎,一次性收到大量的任务。比如说一个对营销活动支持的引擎,突然收到大量的活动券发放工作。这个时候,就需要控制任务执行的流速。
为什么需要控制流速?如果任务大量来临的时候,我们确保任务都有线程执行。那么如果任务量实在太大,就会打破机器能够承受并发线程的临界点。打破临界点的代价包括内存频繁切换——内存不够用,频繁使用磁盘保存内存中的数据。还有比如频繁线程切换造成的线程执行速度下降,线程持有的资源释放缓慢导致任务堆积等等。所以,在操作系统创建了超过自身承受能力的线程数目时,我们看到的将不会是性能的缓慢下降,而是一种类似雪崩的场景。
因此,我们需要控制流速。这个时候一种非常巧妙的方法就是延迟队列。当流量过大,还有2种常见的方案,第一种是支持反向压力(backpressure),其实就是阻塞任务的提交者。这个方案,我们会在下面讨论。 另一个方案是访问拒绝(Deny Of Service),拒绝任务的提交者。
- 延迟队列适合对任务执行延迟容忍度较高的应用(比如数据分析)
- 反向压力通常还会伴随着更多资源的分配,是一种动态的方案,请看后续的讨论
- 访问拒绝适合实时性要求较高的应用,比如web服务
有几个场景比较适合延迟队列,一个是大量计时器的场景。比如说每个订单完成后要给用户发送一封邮件、短信。这些任务被延迟计算,在合适的时间发出去。这样的做法就比使用大量定时器划算(定时器消耗CPU时间较大,需要设置中断和时钟等等)。再比如大量重试发送消息的场景,需要给大量用户发送短信,部分发送失败的可能会以指数补偿的形式进行重试(例如重试Tick:1s,2s,4s,8s……)。
下面是一个DelayQueue使用的示例:
public class DelayQueueTest {
static class DelayedInteger implements Delayed {
int value;
long time = 0;
public DelayedInteger(int v, long delay){
this.value = v;
this.time = delay + System.currentTimeMillis();
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
return (int)(this.time - ((DelayedInteger)o).time);
}
@Override
public String toString() {
return "DelayedInteger{" +
"value=" + value +
", time=" + time+
'}';
}
}
static DelayQueue<DelayedInteger> queue = new DelayQueue<>();
public static void main(String[] argv) {
new Thread(() -> {
for(int i = 0; i < 1000; i++){
System.out.println("offer:" + i);
queue.offer(new DelayedInteger(i, i*1000));
}
}).start();
new Thread(() -> {
while(true) {
try {
var item = queue.take();
System.out.println(item.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
线程池技术
线程池也是一种非常需要BlockingQueue的方案。线程池的目标是复用线程执行任务。当用户有大量的任务需要执行的时候,线程池会决定任何以何种顺序执行。
线程池有一开始内部线程数目就固定的类型(Fixed),也有动态计算需要线程数目的线程池(Cached)。如下图:对于动态决定线程数目的一个算法就是当有任务没有线程消费(处理)的时候,就创建新的线程。
这里,我们可以使用一种叫做反向施压的技术(backpresure)。通常用有界缓冲区实现的队列会在消费者忙不过来的时候被填满,而生产者会一直工作到缓冲区满之后被阻塞或者拒绝。也就是生产者在把队列填满的时候才感受到反向传来的压力,将自己阻塞或者被拒绝。这种情况我们只需要使用正常的有界队列即可,生产者向队列中新增任务,消费者从队列中取出任务执行。
另一种方案是让生产者时时刻刻都感受到来自反向的压力,也就是只有当任务有消费者消费的情况下才允许生产。这种情况,我们考虑把消费者自己放入队列,然后生产者生产的时候从队列中取出消费者。如果生产者生产的某个任务可以取到消费者,那么就可以被执行。当然,如果某次生产,没有取到消费者(消费线程),可以创建多一个消费线程执行这个任务。
上面我们构造一个最基本的思路,但是难就难在,我们希望用户使用的时候接口还是基于task的。这个时候,我们可以考虑一种双向队列的数据结构。
在这种数据结构中,生产者消费者都可以向队列中放入元素,只不过是不同的颜色。生产者放入的是红色的元素,消费者放入的是绿色的元素。每次生产者加入队列,都看一下队列尾部有没有可以匹配的消费者。每次消费者加入队列都看一下队列头部有没有可以匹配的生产者。队列本身可以考虑用单向链表实现——这样维护队列就可以用cas操作。大家可以回忆上上一节讲到的CLH队列。
对一个双向队列,如果生产者先生产,那么会因为没有足够的消费者而阻塞生产。如果消费者先消费,那么会因为没有足够的生产者而阻塞消费。
另外还有一种基于栈的实现,生产者和消费者从同方向想栈(链表)中加入元素。每加入一种颜色的元素,就马上观察栈顶(最左边)是否有相匹配的元素。
栈的实现同样实现了生产者、消费者之间互为转化的关系。因此大家在阅读SynchronizeQueue和LinkedTransferQueue的时候,会发现内部都实现了一种叫做transfer
的操作。transfer
这个单词就是转让的意思。这里指元素被生产者转让(transfer)给了消费者,在有足够消费者的情况下才接收转让,否则就等待有新的消费者出现。
在SynchronousQueue
和LinkedTransferQueue
中,都使用了双向队列(栈)数据结构,巧妙的帮助我们解决了线程池最底层的两个基本操作:
- 生产者等待消费者足够再生产
- 消费者等待生产者足够再消费
最后我提一句,用队列实现的方式是朴素公平的,队列具有FIFO性质。 用栈实现的方式不公平,是FILO性质,但是性能更好。
接下来我会用一个程序来带你学会使用SynchronousQueue和LinkedTransferQueue。我们一起来实现一个基于线程池技术的调度器,当然写线程池也是我们的一个目标。这个调度器+线程池程序会尝试先创N个线程用于初始化执行,然后当线程不够用的时候,线程池会尝试再逐渐继续创建线程。
public class Scheduler {
// SynchronousQueue<Runnable> tasks = new SynchronousQueue<>();
LinkedTransferQueue<Runnable> tasks = new LinkedTransferQueue<>();
static AtomicInteger idCount = new AtomicInteger(0);
int maxWorkers;
public Scheduler(int maxWorkers) {
this.maxWorkers = maxWorkers;
for(int i = 0; i < maxWorkers; i++) {
new Thread(new Worker()).start();
}
}
class Worker implements Runnable {
int id;
public Worker(){
this.id = idCount.getAndIncrement();
}
@Override
public void run() {
while(true) {
Runnable runnable = null;
try {
runnable = tasks.take();
runnable.run();
System.out.format("work done by id=%d\n", id);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public void submit(Runnable r) throws InterruptedException {
while(!tasks.tryTransfer(r)) {
//while(!tasks.offer(r)) {
Thread.onSpinWait();
new Thread(new Worker()).start();
}
}
public static void main(String[] argv) throws InterruptedException {
var scheduler = new Scheduler(10);
for(int i = 0; i < 1000; i++) {
var localI = i;
Thread.sleep(1);
scheduler.submit(() -> {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
上面程序用SynchronousQueue和LinkedTrnasferQueue实现了一个支持反向压力创建线程的调度器(线程池)。上面程序中,LinkedTrnasferQueue.tryTransfer和SynchronousQueue.offer的能力类似——如果没有足够消费者生产者就会失败。当然,相比SynchronousQueue的offer方法,tryTransfer
能力更强,还可以支持等待时间——tryTransfer(wait)
。
如果你去翻阅两个程序的源代码SynchronousQueue
和LinkedTrnasferQueue
,你会发现内部实现有大同小异之处。就是我之前给大家讲的DualQueue、DualStack的实现。构造函数,SynchronousQueue(fair),如果传入false,那么会改用Stack实现,默认是Queue。LinkedTransferQueue只有DualQueue的实现。
面试题:说说SynchronousQueue和LinkedTransferQueue的区别?
它们的能力相似,都提供了transfer的能力——将元素转让给消费者,如果没有足够的消费者就等待或拒绝。这是一种传达反向压力的策略,通过这种方式可以监控消费者是否足够决定后续的策略,比如说增加消费者或者让生产者休眠。
具体列举下异同点:
- 都基于双向(Dual)数据结构。区别:SynchronousQueue可以选择用双向队列或双向栈,FIFO是公平的,FILO是不公平的。LinkedTransferQueue只有队列的实现。
- 都继承于BlockingQueue接口。区别:SynchronousQueue的offer方法,类似LinkedTransferQueue的tryTransfer方法;LinkedTransferQueue的offer方法,类似传统队列比如ArrayBlockingQueue的offer方法。因此,LinkedTransferQueue同时兼容了双向队列和单向队列。
面试题:说说ArrayBlockingQueue和SynchronousQueue最大的区别?
ArrayBlockingQueue不能够实现没有足够的消费者就阻塞生产者的逻辑。 ArrayBlockingQueue只能在队列已经填满的情况下阻塞。也就是SynchronousQueue可以用来实现生产者等待消费者接收元素的逻辑。比如,消息处理场景等待接收方接收消息的场景;再比如线程池中提交任务等待有线程处理的场景。
面试题:什么是有界队列、什么是无界队列?
有界,就是元素个数是限定的。比如ArrayBlockingQueue的构造函数,一开始就需要输入数量,这个就是有界的。一旦队列中的元素达到界限,那么就会阻塞或拒绝生产者(向队列写入元素的线程)。
无界,就是元素个数可以无限增长。比如LinkedTransferQueue
,SynchrounousQueue
,s 和DelayQueue
,这4个队列是无界的。
另外,LinkedBlockingQueue
和LinkedBlockingDeque
有界、无界是可选的。当构造函数中没有参数,它们是无界的;当构造函数中有数字,他们是有界的。
if
面试题:LinkedBlockingDeque是一种双向队列,那么它和SynchronousQueue和LinkedTransferQueue中实现的双向有什么区别?
本质上它们的实现都是双向数据结构,但是算法不同。LinkedBlockingQueue内部是一个双向链表,支持双向插入、删除元素。这种实现因为缺少了类似DualQueue,DualStack中match算法的能力。
图中是一个DualQueue,必须生产者的生产(红色)和消费者的消费(绿色)匹配才能够同时删除两个元素。这样的设计可以同时做到2点:
- 生产的元素消费不完时,将元素在队列中等待消费(排队)
- 消费者不足时给生产者反向压力,阻塞生产者(匹配)
LinkedBlockingQueue只能做到第1点。实现匹配,就需要DualQueue。
总结
本节课讨论了Java中的7中BlockingQueue,内容较长。大家一定要抓住场景。所有问题的核心是因为有这样的需求,有了需求才有动力去创造形形色色的数据结构解决这些需求。发邮件、发短信、下载队列、发红包、操作历史维护undo/redo、RPC、Web服务、传输日志、线程池、Nginx的队列、Redis的队列——这些都是生产者消费者。
有界队列非常适合实时性要求较高(需要拒绝服务)的场景,比如说Web请求,如果生产太快,为了保护服务器本身最好的方法就是拒绝一部分服务。为什么可以拒绝服务?这是因为用户等不起。用户等待1s跳出率10%, 等待2s就有30%,等待10s,就90%跳出了。等待20s?没人用了。这种情况下,队列中能存储大量的元素反而变得没有意义。反而是有界队列(或者有界缓冲区)非常有用,一旦缓冲区满,超出服务器的处理能力,就DOS(Deny Of Service)一部分的请求,保证另一部分请求正常被处理。
无界队列适合对实时性要求不那么敏感的场景,比如说下载大量文件,发大量红包和短信,这些场景,就可以考虑用无界队列。无界队列更关心事情都要做完,而不是事情必须在限定的时效内做完。
学习了这堂课,希望大家以后使用队列的过程中,遇到每个场景的时候都思考下,我这样的队列设计是不是最优的?还有没有更好的方案?保持一颗追求的心。
下一节课,我们将学习无锁编程(Lock-Free Programming)。