Skip to content

面试官:LinkedBlockingDeque和SynchronousQueue工作原理一样吗?

========

面试官:LinkedBlockingDeque和SynchronousQueue工作原理一样吗?

求职者:对不起,队列太多,我记不得了……

经过之前的学习,我们发现队列真的是一种非常高效的数据结构。而世界上很多的问题,都可以被抽象成入队、出队的操作——更加准确的说:生产者、消费者问题应用场景非常的多。从实践的角度,我们不会经常使用同步器,而是使用Java语言在同步器基础之上再封装的结构,其中阻塞队列(BlockingQueue)就是我们平时最常用的一类数据结构。构造业务逻辑,处理批量任务,处理线程池,我们都需要掌握阻塞队列。

这节课,我会为大家介绍7种继承于BlockingQueue的阻塞队列,以及他们的应用场景。

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • LinkedBlockingDeque
  • PriorityBlockingQueue
  • DelayQueue
  • LinkedTransferQueue
  • SynchronousQueue

打破传统的学习方式,我不会按照1-7的顺序,如念经般给你逐一介绍;而是,我会从问题入手,从不同的场景切入,带你思考不同的方案。

本节关联的面试题目:

  1. ArrayBlockingQueue和LinkedBlockingQueue的区别?
  2. add/offer/put的区别?remove/poll/take的区别?
  3. DelayQueue的应用场景是什么?
  4. SynchronousQueue的应用场景是什么?
  5. LinkedTransferQueue和SynchronousQueue有什么异同?
  6. LinkedBlockingDeque和SynchronousQueue工作原理一样吗?

生产者消费者问题

在并发编程中,生产者、消费者问题,是一类非常重要的问题;也是BlockingQueue的一大应用场景。生产者消费者问题由著名的计算机科学家Dijkstra提出,是一个经典的多处理问题(multi-processing)。生产者产生数据,消费者消费数据。 中间用一个有界限的缓冲区连接。

image-20201122151216919

缓冲区通常是有界限的(有大小限制),因此也称称作:有限缓冲区问题(bounded-buffer problem)。但是随着时代发展,缓冲区开始支持自动扩容,尤其是链表这种跳跃数据结构,允许节点在内存分配上不连续,扩容的成本非常低。因此我们也会使用无界缓冲区(unbounded buffer)。

睡觉的理发师问题

下图是睡觉的理发师,是一种经典的有界缓冲区生产者消费者问题:

image-20201122151620677

顾客进入理发店后,现在等待坐席上等待。睡觉的理发师发现有顾客来到,被唤醒,优先处理等待时间最长的顾客。理完发的顾客离开理发店。如果等待坐席满了,新顾客就不会再进来。在这个故事中:

  • 理发师是消费者(Consumer)
  • 顾客不断到来这个显现是生产者(Producer)
  • 没顾客的时候理发师休眠(消费者在等待队列空时休眠)
  • 坐满了顾客不再进入(生产者在等待队列满的情况下休眠)

思考:程序怎么写?

下面程序(伪代码),用itemCount 代表缓冲区中元素的个数。生产过程中生产者用produceItem 生产数据,然后用putItemIntoBuffer 将元素放入缓冲区。消费者用remoteItemFromBuffer 取出缓冲区的元素,然后减少itemCount

如果缓冲区满了itemCount == BUFFER_SIZE ,生产者休眠。如果缓冲区空了itemCount == 0 消费者休眠。当开始生产时,itemCount=1,唤醒消费者。 当开始消费时,itemCount == BUFFER_SIZE_1 时,唤醒生产者。

int itemCount = 0;
void producer() 
{
    while (true) 
    {
        item = produceItem();
        if (itemCount == BUFFER_SIZE) 
            sleep();
        putItemIntoBuffer(item);
        itemCount = itemCount + 1;
        if (itemCount == 1) 
            wakeup(consumer);
    }
}

void consumer() 
{
    while (true) 
    {
        if (itemCount == 0) 
            sleep();

        item = removeItemFromBuffer();
        itemCount = itemCount - 1;

        if (itemCount == BUFFER_SIZE - 1) 
            wakeup(producer);

        consumeItem(item);
    }
}

这个程序有个致命的缺陷,这个程序没有考虑到竞争条件——会导致死锁。死锁就是多个线程互相等待,这里就是生产者等待消费者唤醒,消费者等待生产者唤醒。

假设缓冲区大小是100, 当itemCount==100 时,生产者本来要休眠。但是如果sleep执行前触发了线程切换,消费者开始执行。 消费者从缓冲区取出元素,然后尝试唤醒生产者。生产者线程收到唤醒信号,醒来,然后执行下一句sleep 进入休眠。消费者继续执行,把缓冲区全部清空,也陷入休眠。这样,就构成了死锁。

为了解决死锁问题,可以考虑用synchronized 关键字或者锁,对producerconsumer 方法上锁,但是这样锁的范围太大。

另一种,就是对所有读写操作itemCount 的位置分别上锁,但是这样程序太复杂。比如下面这样,需要处理的个例太多。

synchronized(...) {
    if (itemCount == 0) 
        sleep();
}

因此,通常我们考虑用Semaphore来解决这个问题。Semaphore提供了一对原子操作up down 。生产过程中将空闲数(emptyCount)减1,然后将填充数(fillCount)加1。消费过程将fillCount减1,将emptyCount加1。如果缓冲区满了,emptyCount=1 ,这个时候down操作会导致生产者休眠。如果缓冲区空了,fillCount=0,down操作会导致消费者休眠。

int fillCount = 0; // items produced
int emptyCount = BUFFER_SIZE; // remaining space

void producer() 
{
    while (true) 
    {
        item = produceItem();
        down(emptyCount);
        putItemIntoBuffer(item);
        up(fillCount);
    }
}

void consumer() 
{
    while (true) 
    {
        down(fillCount);
        item = removeItemFromBuffer();
        up(emptyCount);
        consumeItem(item);
    }
}

上面操作updown 操作都是基于CAS的原子操作,updown 之间允许多个线程同时进入。只要fillCount 或者emptyCount 不为0。removeItemFromBufferputItemIntoBuffer 可能需要单独上锁,保证互斥,具体背后操作缓冲区逻辑我这里没有给出。

上面的up和down方法是操作系统提供semaphore的习惯。Java里这两个操作叫做acquire和release。

最后还有一种方法是利用条件变量——Condition。具体的程序在4.4上实现ProducerCustomerModel 时我们已经给出,感兴趣的朋友可以参考。

思考:如何更优雅?上面程序从面向对象角度看还有没有更好的设计?

上面的程序在使用队列时,需要考虑很多同步细节(这里用了Semaphore)。因为使用生产者消费者是一个高频场景,因此可以考虑封装。将同步能力封装进入队列中,这就是Java提供的BlockingQueue Interface,以及它的7种实现。

生产者、消费者模型的用途

在多线程环境中,生产者消费者模型,往往用队列(Queue)实现。队列的用途有很多,比如说web服务,比如说消息队列。

image-20201122155130571

如图,web服务的模型非常类似睡觉的理发师。 只不过从单生产者、单消费者模型,变成了多生产者、多消费者模型。因此队列的**入队(enqueue)出队(dequeue)**操作需要考虑竞争条件。复习一下,对入队、出队上锁是一种最常见的同步方法,也就是互斥(Mutual Exclusion)——对临界区上锁。比上锁并发度更高的做法,是用少数几个CAS操作,比如上节课我们学习的CHL链表。CAS+自旋判断的逻辑,如果能够在少数几个自旋判断中获得临界区的控制权限,那么这个过程将会比锁速度快。

另一个常见的用途是消息队列(Message Queue)。在单播(Unicast)场景,就是1对1通信,这个时候是睡觉的理发师模型。但是在多播(Multicat)场景,模型发生了一定的变化。

image-20201122155709773

如果,因为消息是给每个接收者的,因此每个接收者并不是从队列中直接取出元素进行消费。而是每个接收者需要有一套对队列操作的完整指针(记录了当前读取消息的位置),这套指针最常见的做法就是ThreadLocal技术。向队列中写入数据的时候,用的是一套指针。如果存在多个生产者,那么多个生产者竞争这套指针写入数据。但是当读取数据的时候,为每个接收者通过ThreadLocal创建一套单独的指针,这样能够从头开始消费队列中的数据,就好像队列被复制了很多份一样。

Java Blocking Queue

Java的BlockingQueue Interface下面的7种实现:

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • LinkedBlockingDeque
  • PriorityBlockingQueue
  • DelayQueue
  • LinkedTransferQueue
  • SynchronousQueue

既然是队列,就需要支持先入先出(FIFO)的入队、出队操作。BlockingQueue Interface提供3类入队、出队操作:

  1. 抛异常(add/remove)
    • 如果队列满了继续add会抛异常,如果队列空了继续remove会抛异常
  2. 非阻塞(offer/poll)
    • 如果队列满了继续offer会返回null表示失败;如果队列空了继续poll会返回null表示失败
  3. 阻塞(put/take)
    • 队列满了put会阻塞;队列空了take会阻塞

下面这段程序我们来一起尝试用一下这7个队列:

public class BlockingQueueTest {


    public static void main(String[] argv) {
        BlockingQueue<Integer> queue;
        //queue = new ArrayBlockingQueue<Integer>(10);
        //queue = new LinkedBlockingQueue<Integer>();
        //queue = new LinkedBlockingDeque<>();
        //queue = new PriorityBlockingQueue<>();
        //queue = new LinkedTransferQueue<Integer>();
        queue = new SynchronousQueue<>();
        //queue = new DelayQueue<>();

        // Producer
        for(int i = 0; i < 100; i++) {
            new Thread(() -> {
                //queue.offer((int) Math.floor(Math.random()*1000));
                try {
                    queue.put((int) Math.floor(Math.random()*1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        // Consumer
        for(int i = 0; i < 10; i++) {
            new Thread(() -> {
                while(true) {
                    //Integer x = queue.poll();
                    Integer x = null;
                    try {
                        x = queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.format("Thread %d take %d\n", Thread.currentThread().getId(), x);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

    }
}

除了DelayQueue不能直接使用外,其他的队列功能大同小异。可以通过offer/put添加元素,用poll/take取出元素。多数都遵循FIFO的关系,除了PriorityQueue和DelayQueue。PriorityQueue会每次取出队列中最小的元素。

自己打开要给IDE试跑下上面的程序。

然后可以把puttake 替换程offerpoll 。再观察下有没有什么变化?有一个比较大的区别是SynchronousQueue出现了很多拿到null 的情况。这个现象可以等后面我们学习了SynchronousQueue的实现原理再回来看。

这些问题是我们从 API层面看到的,请大家带着这些问题学习下去。我们下面会结合场景给大家分析这些问题。

线程安全问题

BlockingQueue除了要解决形形色色的场景——发红包、发短信、web服务、RPC调用、线程池……有一个非常重要的目标就是线程安全(Thread Safe)。通常我们写程序是在单线程模型下思考的,在多线程环境下可能会发生竞争条件、饥饿等等各种各样的问题。而线程安全的类,能够在多线程下让用户(程序员)可以像单线程那样思考。比如说queue.take() ,如果是queue 是BlockingQueue,我们可以放心的在多线程环境下使用,不用加锁,不同考虑竞争条件。这样,解决线程安全问题的责任,就落到了BlockingQueue的设计者,而不是BlockingQueue的用户。

Java中java.util.concurrent 中的数据结构多数的线程安全的。但是平时我们用的集合比如说ArrayList,HashMap都不是线程安全的。另外,ThreadLocal是线程安全,比如用ThreadLocal来定义一个static 变量,自然是线程安全的,不过多线程不会共享这个变量,而是每个线程一份。最后,自己可以通过上锁,利用同步器,还有后面讲到的无锁编程,自己实现线程安全的数据结构。

应用场景举例:Web服务器

web服务是一个多生产者+多消费者模型,队列满了会拒绝服务(Deny Of Service),著名的DDOS攻击,就是针对网络通信的缓冲区。

image-20201122160402735

这个场景中,经常我们会用到***BlockingQueue的结构。比如ArrayBlockingQueue,ListedBlockingQueue。List实现的队列扩容更容易,List本身是一个跳跃结构。Array版本的队列只支持有界(bounded)的队列,一开始就需要设置好容量。List的版本支持有界(一开始设置容量),也支持无界。从数据结构本身分析,Array版本的Queue扩容成本更高。Array的扩容可能会触发把Array从内存中一个地方拷贝到另一个地方。当然,Array版本也有优势,Array版本索引成本更低。获取第k个元素在Array中只需要O(1)的操作,而在链表中却需要O(n)的操作。

值得一提的是,List版本的Queue可以同时支持有界(bounded)和无界(unbounded)两种模式。通常建议,如果没有需要直接索引元素的需求,可以考虑用List版本性能更好。

undo问题

很多软件需要实现撤销(undo)能力。浏览器的回退(back)操作,也可以看做是一种undo能力。

image-20201122161125719

假设一个文本编辑器把所有用户的行为(Action)存入一个队列。那么每次用户发生某个行为,就需要入队。队列通常设计成有界的,比如最多只帮用户存10000个操作历史。上图中新操作从右边入队得到记录,超出足够数目(10000)后,操作从左边删除。那么,当我们需要实现undo功能的时候,实际上要从右边删除操作。undo是undo最近的操作,这个时候就需要Deque。

如果上述的操作发生在一个并发环境(多个线程),那么就可以考虑用阻塞队列版本的双向队列,也就是——LinkedBlockingQueue。

双向队列本身没有方向,因此相比虽然poll/take/put/offer可以用,但是通常我们会用下面这几个API:

  • putLast/putFirst
  • takeLast/takeFirst
  • offerLast/offerFirst
  • pollLast/pollFirst

First代表一端,Last代表另一端。

优先级调度算法

在操作系统、离线任务管理框架、线程池等等领域,都需要调度。按照任务的优先级(通常是一个整数)来调度任务,优先级高的先执行,优先级低的等待高优任务执行完后执行,是非常常见的一种调度手段。

元素无序进入优先级队列,然后可以按照优先级有序的取出。这里会用到一种叫做Heap的数据结构。

image-20201122161742880

如图:假设我们的算法规定,优先级数字越小优先级越高。那么我们可以在内存中形成一个树状结构,父级元素总是比子级元素优先级高。这个结构虽然是树状结构,但是可以用数组表示,就是堆(Heap)。每个新元素插入Heap,都需要一定的操作维护Heap的性质——父级元素比自己元素值小。优先级队列的出队操作,每次都从Heap的顶部拿走一个元素,然后Heap自己会重新恢复到Heap的状态。

以下是删除最小元素的演示。

image-20201220102536309

以下是插入新元素过程的演示。

image-20201220102944428

Java为了支持多种需要将元素按照优先级提取,提供了优先级队列PriorityQueue。而PriorityBlockingQueue可以看做是一个线程安全的PriorityQueue。

线程安全,就是在多线程环境可以安全使用,不用考虑临界区保护等等。

延迟队列

在高并发场景下,我们经常需要控制流速。当一个批处理任务的引擎,一次性收到大量的任务。比如说一个对营销活动支持的引擎,突然收到大量的活动券发放工作。这个时候,就需要控制任务执行的流速。

为什么需要控制流速?如果任务大量来临的时候,我们确保任务都有线程执行。那么如果任务量实在太大,就会打破机器能够承受并发线程的临界点。打破临界点的代价包括内存频繁切换——内存不够用,频繁使用磁盘保存内存中的数据。还有比如频繁线程切换造成的线程执行速度下降,线程持有的资源释放缓慢导致任务堆积等等。所以,在操作系统创建了超过自身承受能力的线程数目时,我们看到的将不会是性能的缓慢下降,而是一种类似雪崩的场景。

因此,我们需要控制流速。这个时候一种非常巧妙的方法就是延迟队列。当流量过大,还有2种常见的方案,第一种是支持反向压力(backpressure),其实就是阻塞任务的提交者。这个方案,我们会在下面讨论。 另一个方案是访问拒绝(Deny Of Service),拒绝任务的提交者。

  • 延迟队列适合对任务执行延迟容忍度较高的应用(比如数据分析)
  • 反向压力通常还会伴随着更多资源的分配,是一种动态的方案,请看后续的讨论
  • 访问拒绝适合实时性要求较高的应用,比如web服务

有几个场景比较适合延迟队列,一个是大量计时器的场景。比如说每个订单完成后要给用户发送一封邮件、短信。这些任务被延迟计算,在合适的时间发出去。这样的做法就比使用大量定时器划算(定时器消耗CPU时间较大,需要设置中断和时钟等等)。再比如大量重试发送消息的场景,需要给大量用户发送短信,部分发送失败的可能会以指数补偿的形式进行重试(例如重试Tick:1s,2s,4s,8s……)。

下面是一个DelayQueue使用的示例:

public class DelayQueueTest {

    static class DelayedInteger implements Delayed {

        int value;
        long time = 0;

        public DelayedInteger(int v, long delay){
            this.value = v;
            this.time = delay + System.currentTimeMillis();
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return time - System.currentTimeMillis();
        }

        @Override
        public int compareTo(Delayed o) {
            return (int)(this.time - ((DelayedInteger)o).time);
        }

        @Override
        public String toString() {
            return "DelayedInteger{" +
                "value=" + value +
                ", time=" + time+
                '}';
        }
    }

    static DelayQueue<DelayedInteger> queue = new DelayQueue<>();

    public static void main(String[] argv) {


        new Thread(() -> {
            for(int i = 0; i < 1000; i++){
                System.out.println("offer:" + i);
                queue.offer(new DelayedInteger(i, i*1000));
            }
        }).start();

        new Thread(() -> {
            while(true) {
                try {
                    var item = queue.take();
                    System.out.println(item.toString());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

线程池技术

线程池也是一种非常需要BlockingQueue的方案。线程池的目标是复用线程执行任务。当用户有大量的任务需要执行的时候,线程池会决定任何以何种顺序执行。

image-20201217164640780

线程池有一开始内部线程数目就固定的类型(Fixed),也有动态计算需要线程数目的线程池(Cached)。如下图:对于动态决定线程数目的一个算法就是当有任务没有线程消费(处理)的时候,就创建新的线程。

image-20201217165108965

这里,我们可以使用一种叫做反向施压的技术(backpresure)。通常用有界缓冲区实现的队列会在消费者忙不过来的时候被填满,而生产者会一直工作到缓冲区满之后被阻塞或者拒绝。也就是生产者在把队列填满的时候才感受到反向传来的压力,将自己阻塞或者被拒绝。这种情况我们只需要使用正常的有界队列即可,生产者向队列中新增任务,消费者从队列中取出任务执行。

image-20201217212859270

另一种方案是让生产者时时刻刻都感受到来自反向的压力,也就是只有当任务有消费者消费的情况下才允许生产。这种情况,我们考虑把消费者自己放入队列,然后生产者生产的时候从队列中取出消费者。如果生产者生产的某个任务可以取到消费者,那么就可以被执行。当然,如果某次生产,没有取到消费者(消费线程),可以创建多一个消费线程执行这个任务。

image-20201217212903798

上面我们构造一个最基本的思路,但是难就难在,我们希望用户使用的时候接口还是基于task的。这个时候,我们可以考虑一种双向队列的数据结构。

image-20201218020637262

image-20201218020932971

在这种数据结构中,生产者消费者都可以向队列中放入元素,只不过是不同的颜色。生产者放入的是红色的元素,消费者放入的是绿色的元素。每次生产者加入队列,都看一下队列尾部有没有可以匹配的消费者。每次消费者加入队列都看一下队列头部有没有可以匹配的生产者。队列本身可以考虑用单向链表实现——这样维护队列就可以用cas操作。大家可以回忆上上一节讲到的CLH队列。

对一个双向队列,如果生产者先生产,那么会因为没有足够的消费者而阻塞生产。如果消费者先消费,那么会因为没有足够的生产者而阻塞消费。

另外还有一种基于栈的实现,生产者和消费者从同方向想栈(链表)中加入元素。每加入一种颜色的元素,就马上观察栈顶(最左边)是否有相匹配的元素。

image-20201218021125182

栈的实现同样实现了生产者、消费者之间互为转化的关系。因此大家在阅读SynchronizeQueue和LinkedTransferQueue的时候,会发现内部都实现了一种叫做transfer 的操作。transfer 这个单词就是转让的意思。这里指元素被生产者转让(transfer)给了消费者,在有足够消费者的情况下才接收转让,否则就等待有新的消费者出现。

SynchronousQueueLinkedTransferQueue 中,都使用了双向队列(栈)数据结构,巧妙的帮助我们解决了线程池最底层的两个基本操作:

  1. 生产者等待消费者足够再生产
  2. 消费者等待生产者足够再消费

最后我提一句,用队列实现的方式是朴素公平的,队列具有FIFO性质。 用栈实现的方式不公平,是FILO性质,但是性能更好。

接下来我会用一个程序来带你学会使用SynchronousQueue和LinkedTransferQueue。我们一起来实现一个基于线程池技术的调度器,当然写线程池也是我们的一个目标。这个调度器+线程池程序会尝试先创N个线程用于初始化执行,然后当线程不够用的时候,线程池会尝试再逐渐继续创建线程。

public class Scheduler {

//    SynchronousQueue<Runnable> tasks = new SynchronousQueue<>();
    LinkedTransferQueue<Runnable> tasks = new LinkedTransferQueue<>();

    static AtomicInteger idCount = new AtomicInteger(0);

    int maxWorkers;

    public Scheduler(int maxWorkers) {
        this.maxWorkers = maxWorkers;
        for(int i = 0; i < maxWorkers; i++) {
            new Thread(new Worker()).start();
        }
    }

    class Worker implements Runnable {

        int id;
        public Worker(){
            this.id = idCount.getAndIncrement();
        }

        @Override
        public void run() {
            while(true) {
                Runnable runnable = null;
                try {
                    runnable = tasks.take();
                    runnable.run();
                    System.out.format("work done by id=%d\n", id);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }

    public void submit(Runnable r) throws InterruptedException {
        while(!tasks.tryTransfer(r)) {
        //while(!tasks.offer(r)) {
            Thread.onSpinWait();
            new Thread(new Worker()).start();
        }
    }

    public static void main(String[] argv) throws InterruptedException {
        var scheduler = new Scheduler(10);

        for(int i = 0; i < 1000; i++) {
            var localI = i;
            Thread.sleep(1);
            scheduler.submit(() -> {
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }

}

上面程序用SynchronousQueue和LinkedTrnasferQueue实现了一个支持反向压力创建线程的调度器(线程池)。上面程序中,LinkedTrnasferQueue.tryTransfer和SynchronousQueue.offer的能力类似——如果没有足够消费者生产者就会失败。当然,相比SynchronousQueue的offer方法,tryTransfer 能力更强,还可以支持等待时间——tryTransfer(wait)

如果你去翻阅两个程序的源代码SynchronousQueueLinkedTrnasferQueue ,你会发现内部实现有大同小异之处。就是我之前给大家讲的DualQueue、DualStack的实现。构造函数,SynchronousQueue(fair),如果传入false,那么会改用Stack实现,默认是Queue。LinkedTransferQueue只有DualQueue的实现。

面试题:说说SynchronousQueue和LinkedTransferQueue的区别?

它们的能力相似,都提供了transfer的能力——将元素转让给消费者,如果没有足够的消费者就等待或拒绝。这是一种传达反向压力的策略,通过这种方式可以监控消费者是否足够决定后续的策略,比如说增加消费者或者让生产者休眠。

具体列举下异同点:

  1. 都基于双向(Dual)数据结构。区别:SynchronousQueue可以选择用双向队列或双向栈,FIFO是公平的,FILO是不公平的。LinkedTransferQueue只有队列的实现。
  2. 都继承于BlockingQueue接口。区别:SynchronousQueue的offer方法,类似LinkedTransferQueue的tryTransfer方法;LinkedTransferQueue的offer方法,类似传统队列比如ArrayBlockingQueue的offer方法。因此,LinkedTransferQueue同时兼容了双向队列和单向队列。

面试题:说说ArrayBlockingQueue和SynchronousQueue最大的区别?

ArrayBlockingQueue不能够实现没有足够的消费者就阻塞生产者的逻辑。 ArrayBlockingQueue只能在队列已经填满的情况下阻塞。也就是SynchronousQueue可以用来实现生产者等待消费者接收元素的逻辑。比如,消息处理场景等待接收方接收消息的场景;再比如线程池中提交任务等待有线程处理的场景。

面试题:什么是有界队列、什么是无界队列?

有界,就是元素个数是限定的。比如ArrayBlockingQueue的构造函数,一开始就需要输入数量,这个就是有界的。一旦队列中的元素达到界限,那么就会阻塞或拒绝生产者(向队列写入元素的线程)。

无界,就是元素个数可以无限增长。比如LinkedTransferQueueSynchrounousQueue,s 和DelayQueue ,这4个队列是无界的。

另外,LinkedBlockingQueueLinkedBlockingDeque 有界、无界是可选的。当构造函数中没有参数,它们是无界的;当构造函数中有数字,他们是有界的。

if

面试题:LinkedBlockingDeque是一种双向队列,那么它和SynchronousQueue和LinkedTransferQueue中实现的双向有什么区别?

本质上它们的实现都是双向数据结构,但是算法不同。LinkedBlockingQueue内部是一个双向链表,支持双向插入、删除元素。这种实现因为缺少了类似DualQueue,DualStack中match算法的能力。

image-20201219112835140

图中是一个DualQueue,必须生产者的生产(红色)和消费者的消费(绿色)匹配才能够同时删除两个元素。这样的设计可以同时做到2点:

  1. 生产的元素消费不完时,将元素在队列中等待消费(排队)
  2. 消费者不足时给生产者反向压力,阻塞生产者(匹配)

LinkedBlockingQueue只能做到第1点。实现匹配,就需要DualQueue。

总结

本节课讨论了Java中的7中BlockingQueue,内容较长。大家一定要抓住场景。所有问题的核心是因为有这样的需求,有了需求才有动力去创造形形色色的数据结构解决这些需求。发邮件、发短信、下载队列、发红包、操作历史维护undo/redo、RPC、Web服务、传输日志、线程池、Nginx的队列、Redis的队列——这些都是生产者消费者。

有界队列非常适合实时性要求较高(需要拒绝服务)的场景,比如说Web请求,如果生产太快,为了保护服务器本身最好的方法就是拒绝一部分服务。为什么可以拒绝服务?这是因为用户等不起。用户等待1s跳出率10%, 等待2s就有30%,等待10s,就90%跳出了。等待20s?没人用了。这种情况下,队列中能存储大量的元素反而变得没有意义。反而是有界队列(或者有界缓冲区)非常有用,一旦缓冲区满,超出服务器的处理能力,就DOS(Deny Of Service)一部分的请求,保证另一部分请求正常被处理。

无界队列适合对实时性要求不那么敏感的场景,比如说下载大量文件,发大量红包和短信,这些场景,就可以考虑用无界队列。无界队列更关心事情都要做完,而不是事情必须在限定的时效内做完。

学习了这堂课,希望大家以后使用队列的过程中,遇到每个场景的时候都思考下,我这样的队列设计是不是最优的?还有没有更好的方案?保持一颗追求的心。

下一节课,我们将学习无锁编程(Lock-Free Programming)。

文章来源于自己总结和网络转载,内容如有任何问题,请大佬斧正!联系我