Skip to content

面试官:说6个Java的同步器?

面试官:说6个Java的同步器?

求职者:

  1. ReentrantLock
  2. Semaphore
  3. CyclicBarrier
  4. CountDownLatch
  5. Phaser
  6. Exchanger

面试官:还有么?

求职者:给我AQS,我随便写~你要几个?

大家注意,直接问XXX怎么用是比较容易回答的,只需要读过文档。但是如果问什么场景会用到XXX?就比较麻烦了,必须找到合适的场景。 还有一种比较麻烦是XXX怎么实现?必须能够说出实现原理。因为是面试课,我会重点关注在场景和实现上,帮助大家提高。

这会关联到非常多的面试题:

  1. synchronized和ReentrantLock的区别?
  2. 什么场景下需要锁可以重入?
  3. 什么是公平锁/非公平锁?
  4. 什么场景用Semaphore?
  5. 简述Phaser/CountDownLatch/CyclicBarrier三者的应用场景和实现原理?
  6. 什么场景会用到Exchanger?

ReentrantLock

Re-entrant-Lock(Re:重入,entrant:进入者, Lock:锁), 中文是可重入锁。虽然名字比较复杂,但是大家可以理解成synchronized 关键字的另一个选择,主要目的就是实现互斥。

互斥,有点类似排队的能力(见上图),当然可重入锁和synchronized 关键字有很多区别。

比如这道面试题:ReentrantLock和synchronized有什么区别和?

相同点:

  1. 都可以对临界区进行互斥保护(锁、解锁)
  2. 都可以重入
  3. 都提供线程协作(Monitor提供Object.wait/notify等,ReentrantLock提供条件变量)
  4. 都提供锁的升级逻辑和队列
    1. monitor: 偏向锁->轻量级锁->重量级锁
    2. AQS: CAS竞争->休眠+排队竞争
  5. 都提供等待队列
    1. monitor: EntrySet, WaitSet
    2. AQS : CLH队列

不同点:

  1. 基于AQS vs 基于Monitor
  2. Java生态 vs 非Java生态
  3. 响应线程中断(InterruptException) vs 不响应
  4. 提供tryLock vs 不提供
  5. 跨Block vs 单Block
  6. 可配置公平性 vs 不可配置

什么是可重入?

可重入锁的实现主要就是AQS。重写AQS的tryAcquiretryRelease 方法。因为锁是可以重入了,tryAcquire 的条件除了可以用cas 操作占有锁外,当前线程不能已经占有了锁。如果当前线程占有了锁,当前线程不会再重新获得锁,而且需要自增锁的状态。

AQS协助记录了当前线程,并且协助实现了一个整数的状态(或者将这个状态理解成锁本身)。锁本身一个整数变量就足够。不知道大家还记不记得synchronized 如何抢占锁?——复习一下,synchronized 抢占锁,比如synchronized(obj) 就是抢占obj 对象关联的 Monitor 。在Monitor 对象中写入线程的引用,就相当于抢占了锁。而引用是什么?引用是一个内存地址——还是整数。

可重入锁因为可以重入,ReentrantLock实现的tryLock 方法可以执行多次:

    var lock = new ReentrantLock();
    
    new Thread(() -> {
      lock.lock();
      lock.lock();
      // 临界区
      lock.unlock();  
      lock.unlock();
    }).start();

多次lockunlock 也需要多次。每次lock AQS的state 增1。每次unlock AQS的state 减1。lock 次数多余unlock 次数就相当于没有释放锁。因此只有将state 写为0的unlock 操作才真的释放锁。

这里有一道关联面试题:什么场景下会需要锁可以重入?

通常情况下我们没有需要多次加锁的需求。之所以提供这样的特性,是为了如果人们真的加锁了多次,程序还可以工作。——释放程序员的心智。

还有一道关联面试题:什么是公平锁、什么是非公平锁?

关于公平锁上节课已经提到过,如果对于一个资源,线程可以先到先得,就是朴素的公平锁。公平是相对的,你可以制定规则,比如优先级高的先得,不能说就不公平——但是不够朴素。 ReentrantLock中有个公平锁的配置项,就是就是朴素的公平。

    ReentrantLock(boolean fair)

构造时如果不写fair 参数默认是false 。这个是因为性能考虑,允许新加入的线程自旋竞争锁,会比从队列中取出一个再重新竞争速度更快。(具体可以参考上一节课中有关公平性的性能分析)

信号量(Semaphore)

信号量(Semaphore)核心价值是允许N个线程同时进入临界区,N可以控制。如果N=1的时候,Semaphore就转换成了Mutex(互斥锁)。如果N>1,Semaphore会控制进入临界区的线程数量。

当N>1时,Semaphore不控制竞争条件,只控制进入的线程数量,因此可以和synchronized 或者ReentrantLock 一起使用控制竞争。 Semaphore的概念,有点像多道程序设计,见上图,上图是N=2的情况。

Semaphore内部有一个状态(继承自AQS),用于标识还有多少个线程可以进入临界区。状态初始值可以在构造函数中设置。初始值=1,以为着实现Mutex。accquire 操作会减少这个状态,release 操作会增加这个状态。

下面程序帮通过Semaphore控制了并发数量最大是10。如果超过时,线程会在semaphore.acquire 处休眠,直到某个线程调用semaphore.release 释放计数。

    var semaphore = new Semaphore(10);
    var lock = new ReentrantLock();
    Runnable runnable = () -> {
        try {
            semaphore.acquire();
            // 开销较大的计算
            lock.lock();
            // 临界区
            lock.unlock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            semaphore.release();
        }
    };
    
    for(int i = 0; i < 1000; i++) {
        new Thread(runnable).start();
    }

Semaphore 最常见的一种应用场景就是实现生产者/消费者模式。 比如下面这段程序:

    final int MAX = 100;
    Semaphore empty = new Semaphore(MAX);
    Semaphore full = new Semaphore(0);
    
    void produce() throws InterruptedException {
        empty.acquire();
        
        synchronized (queue) {
            // 操作队列
        }
        full.release();
    }
    
    void consume() throws InterruptedException {
        full.acquire();
        synchronized (queue) {
            // 操作队列
        }
        empty.release();
    }

上面的程序通过两个Semaphore控制生产者、消费者的队列中最多100个元素。 同时,当队列满时,生产者休眠;当队列空时,消费者休眠。这段程序帮助多线程环境减少了很多边界条件的判断。

用AQS实现Semaphore的部分代码如下:

    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

当需要acquire 时,Semaphore先获取当前的状态avaliable ——还可以进入临界区的线程数量。然后可进入的线程数量减去需要申请进入的数量(通常是1),就可以知道接下来会剩余多少个。然后用CAS操作不断尝试抢占AQS的状态。

到这里,关于信号量我们讲完了,问几道面试题:

问:信号量的做用是什么?

答:最核心的作用就是控制并发量。

追问:具体有什么用?

答:比如用两个信号量可以实现生产者消费者模型?

追问:那为什么不用Semaphore实现ArrayBlockingQueue?

答:因为AQS底层实现的Condition(条件变量)直接基于CLH队列更高效,Semaphore是基于AQS的封装,开销略大一点。

追问:那Semaphore的价值是什么?

答:生产者消费者当然也是场景之一,用Semaphore实现生产者消费者心智负担更小,代码更好些。Semaphore写生产者消费者只需要维护简单的数学加减关系,不用手动调用signal和await。如果不在意写程序的负担,真正用的时候,也可以考虑Condition。但是通常写底层库的人才不在意程序好不好写,在性能最优程序稍微麻烦和性能比价优秀间程序稍微简单,需要选择性能最优的方案。另外,还可以用Semaphore来控制并发量,比如提供服务给其他团队用的时候,用Semaphore控制对方调用的流速。

CyclicBarrier

Barrier叫做屏障,是一种非常重要的同步元语。 Java提供了多种屏障能力,比如CyclicBarrier让线程在一个屏障上等待,然后执行同步快,然后分开执行,然后进入下一个屏障。

图中,灰色部分代表屏障,有两个线程总是在屏障上等待;绿色的部分是同步点的程序,紫色和红色代表两个线程。当然也可以是3个、4个……线程。每个周期成为一个代(generation)。

举个例子:比如有1000W条订单数据需要进程处理/分析。 每个订单数据要获取关联的商品和发货单进行分析。

可以考虑每代CyclicBarrier处理1W条订单数据,开10个线程,每个获取1000个商品信息,再开10个线程,每个获取1000个发货单信息,然后利用CyclicBarrier进行一次同步计算。

我们先弄清一些基本概念,上面这个模型每个代处理10000条订单数据,需要1000代(generation)才能完成。总共要开20个线程,20个线程必须都完成才能执行同步块,我们称为屏障中有20个合作方(party)。

上面的方法中,批量处理数据(读写数据库商品、发货单数据)的一个优势是批量操作数据库速度更快;开多线程的优势是充分利用CPU资源。具体应该如何拆分批次,如何确定线程,要以实际场景经过尝试得到的真实数据为准。

下面程序演示CyclicBarrier的工作过程,我们假设有1000页的数据需要计算。每个CyclicBarrier的代(generation)我们用两个线程处理一页数据。然后在Barrier构造函数中写一个Runnable用于同步,每处理完一次我们在第8行打印一个当页处理完成。

代码中线程执行完自己的任务就调用CyclicBarrier.await 方法,让自己休眠,等待所有线程都调用了await 方法,CyclicBarrir就会进入下一个generation,这时候await方法后面的程序技术执行。

    public class CyclicBarrierTest {
    
        CyclicBarrier barrier;
        int page = 0;
        public CyclicBarrierTest(){
            barrier = new CyclicBarrier(2, () -> {
                System.out.println("Page " + page + " Done.");
                page++;
            });
        }
    
        void prepareProducts(){
            while(page < 1000){
                //
                try {
                    Thread.sleep(20); // 模拟读取数据
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    
        void prepareDeliveryOrders(){
            while(page < 1000) {
                try {
                    Thread.sleep(10); // 模拟读取数据
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    
        void run(){
            new Thread(this::prepareProducts).start();
            new Thread(this::prepareDeliveryOrders).start();
        }
    
        public static void main(String[] argv) {
            var test = new CyclicBarrierTest();
            test.run();
        }
    
    }

相比单线程计算,上述方法可以帮助节省很多CPU、I/O资源,而且更快。

实现原理分析

CyclicBarrier内部有一个ReentrantLock用于保护计算逻辑,还有一个条件变量用于让线程们在条件变量中休眠。newCondition 创建的条件变量来自于AQS的一个内部类——ConditionObject。这个类的实现我这次没有深入讨论。条件变量的作用就是支持大量线程在一个条件变量上休眠,然后集体唤醒他们。

    /** The lock for guarding barrier entry */          
    private final ReentrantLock lock = new ReentrantLock
    /** Condition to wait on until tripped */           
    private final Condition trip = lock.newCondition();

如果一个线程调用await ,可以这样实现:

    lock.lock();
    index = 还有多少个线程就进入下一代
    if(index == 0) {
      trip.notifyAll(); // 通知所有线程继续执行  
    } else {
      trip.await(); // 让当前线程在条件变量下等待
    }
    lock.unlock();

条件变量可以用Semaphore 实现,也可以用队列,让休眠的线程记录在一个队列中,比如上节课讲的CLH算法。具体实现,大家可以参考AbstractQueuedSynchronizer的ConditionObject类。 也可以回顾下我们之前讲课用Semaphore实现生产者/消费者模型。

最后我们一起思考下CyclicBarrier解决了什么问题?CyclicBarrier提供了一套协作机制,解决的是多个线程间协作(也可以认为是通信),一起处理任务的问题。如果不提供CyclicBarrier我们可以用ReentrantLock+Condition实现类似的能力,但是CyclicBarrier覆盖的场景确实具有通用性,因此抽象成数据结构非常有价值。

CountDownLatch

CountDownLatcher

CountDownLatch的逻辑简单一些,本质上也是一个Barrier,相当于只有一代CyclicBarrier。比如说实现一个返回首页数据的服务,需要请求多个的微服务——A、B、C、D的数据。这个时候可以用CountDownLatch来形成一个同步点。

下面程序模拟了我们描述的过程,我们抽象了一个叫做RPC的任务,相当于封装一个远程RPC请求。如果4个请求需要同步,那么我们可以用CountDownLatch。每个线程获取数据后,执行一次countDown 方法,这样主线程的latch.await 方法收集够4个线程的countDown之后,就会继续执行。

    public class CountDownLatcherTest {
    
        class RPC implements Runnable{
            String url;
            CountDownLatch latch;
            public RPC(String url, CountDownLatch latch){
                this.url = url;
                this.latch = latch;
            }
    
            @Override
            public void run() {
                System.out.println("request");
                latch.countDown();
            }
        }
        public void run() throws InterruptedException {
            var requests = new ArrayList<RPC>();
            var latch = new CountDownLatch(4);
            requests.add(new RPC("rpc://A", latch));
            requests.add(new RPC("rpc://B", latch));
            requests.add(new RPC("rpc://C", latch));
            requests.add(new RPC("rpc://D", latch));
            requests.forEach((x) -> new Thread(x).start());
            latch.await();
            // merge 4 request result
            System.out.println("merge");
    
        }
    
        public static void main(String[] argv) throws InterruptedException {
            var test = new CountDownLatcherTest();
            test.run();
        }
    
    }

实现原理分析

CountDownLatch直接借用了AQS的能力。 AQS有一些控制共享进入临界区的方法。先设置AQS的状态为希望共享进入临界区的线程数量,再控制tryAcquireShared 方法的实现,每次有线程进入临界区就把状态减1

下面程序取自CountDownLatch中内部类Sync源代码,Sync继承于AQS。当tryAcquireShared 返回>0的值时,代表需要上锁。 tryReleaseShared 每次执行就把状态减少1,当状态=0时相当于锁住。

    protected int tryAcquireShared(int acquires) {
       return (getState() == 0) ? 1 : -1;
    }
    
    
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c - 1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }

countDown 的程序如下,相当于每次把状态的值减少1。

    public void countDown() {
       sync.releaseShared(1);
    }

最后我们思考下,CountDownLatch解决了什么问题?解决的还是多个线程间协作(通信)处理任务的问题,实现了一套屏障机制。 和CyclicBarrier相比,它更适合处理少量需要同时并发,并且需要同步结果的任务。

Phaser

Phaser提供的也是屏障能力,可以把Phaser理解成一种实现屏障能力的框架。 可以用来实现CyclicBarrier和CountDownLatch。后面我们学习TaskForkPoll的时候,也会用到Phaser去实现。

作为框架,最重要的就是思考清楚Barrier这个领域有哪些领域知识,下面我们来总结一下:

  1. 屏障(Barrier):合作的线程们在屏障上等待, 然后进入同步点

  2. 同步点(Synchronization Point),通过屏障后执行的同步代码

    image-20201213144829225

  3. 合作方数量(paties),就是互相等待的线程数量,只有当等待数量达到parties,才会进入同步点

  4. 到来(arrive),代表一个线程到达屏障,并等待,每次有线程到来,arrives + 1

  5. 到达数量(arrives),带到的线程数量

  6. 等待(wait),代表线程在barrier上等待

  7. 进步(advance),一个线程通过屏障,称为进步,代表工作有进度

  8. 开动/下一阶段(tripping/next pharse):到来的线程数量=parties,开始进入同步点

  9. 阶段(phase number):类似CyclicBarrier中的代,每次完成一次开动,phase number加1

Phaser除了抽象出了上面这些概念,还提供了一个更灵活的能力,让parties 可以随时变更。一个线程可以声明自己是一个合作方。

考虑对单个线程,Phaser可以从这几个方面思考:

  • arrive (到达),在屏障上等待其他合作方, 到达线程数(arrives)增1
  • register(注册),相当于声明自己是一个合作方,将parties 增1
  • waitAdvance(等待进步),在屏障上等待其他线程,数量够了就进入同步点
  • deregister(注销),相当于注销自己,parties减1

以上操作有一些是放在一起用的,比如说arriveAndWaitAdvance 方法,相当于增加了phaser number,又同时等待进步。比如说arriveAndDeregister 相当于,到来,但是不等待,并注销自己。

有了上面的能力,等价于CyclicBarrierawait 的是arriveAndWaitAdvance 。实现CyclicBarrier 的循环结构,需要在每个同步点再创建对应数量的任务。下面这段程序用Phaser模拟了CyclicBarrier的作用。 但是相比CyclicBarrirer,我不用提前设置总共有多少个线程。

    public class PhaserTest {
    
        Phaser phaser = new Phaser();
        ExecutorService executorService = Executors.newCachedThreadPool();
    
        class Worker implements Runnable{
            @Override
            public void run(){
                phaser.register();
                while(true) {
                    try {
                        Thread.sleep(1000);
                        System.out.println("I'm working! @" + phaser.getPhase());
                        phaser.arriveAndAwaitAdvance();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
    
        public void run() throws InterruptedException {
            phaser.register();
            executorService.execute(new Worker());
            executorService.execute(new Worker());
            executorService.execute(new Worker());
            executorService.execute(new Worker());
            while(true) {
                phaser.arriveAndAwaitAdvance();
                // Do some sync. point work;
                System.out.println("Sync...." + phaser.getPhase());
            }
        }
    
        public static void main(String[] argv) throws InterruptedException {
            var test = new PhaserTest();
            test.run();
        }
    
    }

Exchanger

Exchanger帮助我们在两个线程间交换数据。交换数据本身不是一个原子操作。比如交换a和b,需要一个临时变量t:

    t = a;
    a = b;
    b = t;

上面的程序不是一条原子操作,因此线程间如果要交换数据,需要提供一个专门的能力。

什么时候需要交换数据呢?

比如:生产者读取了一个Buffer的数据,交给消费者处理。然后消费者处理完,再把Buffer传回给生产者。

下面是一段伪代码:

    var exchanger = new Exchanger();
    
    // Thread 1 : 生产者
    初始化: writeBuffer = 空
    while(true) {
        writeDataBuffer(writeBuffer);
        writeBuffer = exchanger.exchange(writeBuffer);
    }
    
    
    // Thread2: 消费者
    初始化: readBuffer = 空
    while(true) {
        var readBuffer = exchanger.exchange(readBuffer);
        var str = readFromBuffer(readBuffer);
        
    }

消费者不断清空Buffer,然后调用exchanger 的Exchange方法把空Buffer给生产者。 生产者填满Buffer,然后把它给消费者。当生产者触发exchange方法时,线程会开始等待,直到消费者也同时触发。 通常这个类作用于两两线程之间处理交换数据,如果多个线程同时触发exchange将会触发不可控的随机行为。

上面这个例子中,允许一个线程在读取数据的同时,另一个线程在针对结果进行计算。 大家可以思考这个模式和CyclicBarrier比较,哪个更好?

本质上,这个方式就是实现了一个paties=2的CyclicBarrier,只不过交换数据的行为我们不需要再实现一个两个线程的公共变量,并用锁)

现在我们来思考下Exchanger解决了什么问题?解决了线程间交换数据的问题。交换是一个高效操作,比如说交换Buffer,节省了内存和计算消耗(比起两个Buffer)。

总结

最后请大家再思考一个问题,同步器解决了什么问题?

其实核心就是一个问题,同步器提供基于同步的线程间协作。 有交换数据,有互相等待,有控制并发量等等。

至此,同步器就告一段落。最底层的CAS,Monitor,到中间的AQS,再到介绍了6种同步器。我希望大家返回器思考下,通过同步器上、中、下3篇的学习,有没有理解XXX为什么这样设计这个核心的问题?这个XXX可能是synchronized,可能是AQS,可能是CountDownLatch,可能是CyclicBarrier。回答这类问题,第一大家要从他们的使用场景分析,第二大家要懂得他们的实现原理。而且一定要对比着看,多问自己A能不能用B替代,这样的问题。

下一节课,我们学习内存一致性模型和volatile。

文章来源于自己总结和网络转载,内容如有任何问题,请大佬斧正!联系我