面试官:说6个Java的同步器?
面试官:说6个Java的同步器?
求职者:
- ReentrantLock
- Semaphore
- CyclicBarrier
- CountDownLatch
- Phaser
- Exchanger
面试官:还有么?
求职者:给我AQS,我随便写~你要几个?
大家注意,直接问XXX怎么用是比较容易回答的,只需要读过文档。但是如果问什么场景会用到XXX?就比较麻烦了,必须找到合适的场景。 还有一种比较麻烦是XXX怎么实现?必须能够说出实现原理。因为是面试课,我会重点关注在场景和实现上,帮助大家提高。
这会关联到非常多的面试题:
- synchronized和ReentrantLock的区别?
- 什么场景下需要锁可以重入?
- 什么是公平锁/非公平锁?
- 什么场景用Semaphore?
- 简述Phaser/CountDownLatch/CyclicBarrier三者的应用场景和实现原理?
- 什么场景会用到Exchanger?
ReentrantLock
Re-entrant-Lock(Re:重入,entrant:进入者, Lock:锁), 中文是可重入锁。虽然名字比较复杂,但是大家可以理解成synchronized
关键字的另一个选择,主要目的就是实现互斥。
互斥,有点类似排队的能力(见上图),当然可重入锁和synchronized
关键字有很多区别。
比如这道面试题:ReentrantLock和synchronized有什么区别和?
相同点:
- 都可以对临界区进行互斥保护(锁、解锁)
- 都可以重入
- 都提供线程协作(Monitor提供Object.wait/notify等,ReentrantLock提供条件变量)
- 都提供锁的升级逻辑和队列
- monitor: 偏向锁->轻量级锁->重量级锁
- AQS: CAS竞争->休眠+排队竞争
- 都提供等待队列
- monitor: EntrySet, WaitSet
- AQS : CLH队列
不同点:
- 基于AQS vs 基于Monitor
- Java生态 vs 非Java生态
- 响应线程中断(InterruptException) vs 不响应
- 提供tryLock vs 不提供
- 跨Block vs 单Block
- 可配置公平性 vs 不可配置
什么是可重入?
可重入锁的实现主要就是AQS。重写AQS的tryAcquire
和tryRelease
方法。因为锁是可以重入了,tryAcquire
的条件除了可以用cas
操作占有锁外,当前线程不能已经占有了锁。如果当前线程占有了锁,当前线程不会再重新获得锁,而且需要自增锁的状态。
AQS协助记录了当前线程,并且协助实现了一个整数的状态(或者将这个状态理解成锁本身)。锁本身一个整数变量就足够。不知道大家还记不记得synchronized
如何抢占锁?——复习一下,synchronized
抢占锁,比如synchronized(obj)
就是抢占obj
对象关联的 Monitor
。在Monitor
对象中写入线程的引用,就相当于抢占了锁。而引用是什么?引用是一个内存地址——还是整数。
可重入锁因为可以重入,ReentrantLock实现的tryLock
方法可以执行多次:
var lock = new ReentrantLock();
new Thread(() -> {
lock.lock();
lock.lock();
// 临界区
lock.unlock();
lock.unlock();
}).start();
多次lock
,unlock
也需要多次。每次lock
AQS的state
增1。每次unlock
AQS的state
减1。lock
次数多余unlock
次数就相当于没有释放锁。因此只有将state
写为0的unlock
操作才真的释放锁。
这里有一道关联面试题:什么场景下会需要锁可以重入?
通常情况下我们没有需要多次加锁的需求。之所以提供这样的特性,是为了如果人们真的加锁了多次,程序还可以工作。——释放程序员的心智。
还有一道关联面试题:什么是公平锁、什么是非公平锁?
关于公平锁上节课已经提到过,如果对于一个资源,线程可以先到先得,就是朴素的公平锁。公平是相对的,你可以制定规则,比如优先级高的先得,不能说就不公平——但是不够朴素。 ReentrantLock中有个公平锁的配置项,就是就是朴素的公平。
ReentrantLock(boolean fair)
构造时如果不写fair
参数默认是false
。这个是因为性能考虑,允许新加入的线程自旋竞争锁,会比从队列中取出一个再重新竞争速度更快。(具体可以参考上一节课中有关公平性的性能分析)
信号量(Semaphore)
信号量(Semaphore)核心价值是允许N个线程同时进入临界区,N可以控制。如果N=1的时候,Semaphore就转换成了Mutex(互斥锁)。如果N>1,Semaphore会控制进入临界区的线程数量。
当N>1时,Semaphore不控制竞争条件,只控制进入的线程数量,因此可以和synchronized
或者ReentrantLock
一起使用控制竞争。 Semaphore的概念,有点像多道程序设计,见上图,上图是N=2的情况。
Semaphore内部有一个状态(继承自AQS),用于标识还有多少个线程可以进入临界区。状态初始值可以在构造函数中设置。初始值=1,以为着实现Mutex。accquire
操作会减少这个状态,release
操作会增加这个状态。
下面程序帮通过Semaphore控制了并发数量最大是10。如果超过时,线程会在semaphore.acquire
处休眠,直到某个线程调用semaphore.release
释放计数。
var semaphore = new Semaphore(10);
var lock = new ReentrantLock();
Runnable runnable = () -> {
try {
semaphore.acquire();
// 开销较大的计算
lock.lock();
// 临界区
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
};
for(int i = 0; i < 1000; i++) {
new Thread(runnable).start();
}
Semaphore
最常见的一种应用场景就是实现生产者/消费者模式。 比如下面这段程序:
final int MAX = 100;
Semaphore empty = new Semaphore(MAX);
Semaphore full = new Semaphore(0);
void produce() throws InterruptedException {
empty.acquire();
synchronized (queue) {
// 操作队列
}
full.release();
}
void consume() throws InterruptedException {
full.acquire();
synchronized (queue) {
// 操作队列
}
empty.release();
}
上面的程序通过两个Semaphore控制生产者、消费者的队列中最多100个元素。 同时,当队列满时,生产者休眠;当队列空时,消费者休眠。这段程序帮助多线程环境减少了很多边界条件的判断。
用AQS实现Semaphore的部分代码如下:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
当需要acquire
时,Semaphore先获取当前的状态avaliable
——还可以进入临界区的线程数量。然后可进入的线程数量减去需要申请进入的数量(通常是1),就可以知道接下来会剩余多少个。然后用CAS操作不断尝试抢占AQS的状态。
到这里,关于信号量我们讲完了,问几道面试题:
问:信号量的做用是什么?
答:最核心的作用就是控制并发量。
追问:具体有什么用?
答:比如用两个信号量可以实现生产者消费者模型?
追问:那为什么不用Semaphore实现ArrayBlockingQueue?
答:因为AQS底层实现的Condition(条件变量)直接基于CLH队列更高效,Semaphore是基于AQS的封装,开销略大一点。
追问:那Semaphore的价值是什么?
答:生产者消费者当然也是场景之一,用Semaphore实现生产者消费者心智负担更小,代码更好些。Semaphore写生产者消费者只需要维护简单的数学加减关系,不用手动调用signal和await。如果不在意写程序的负担,真正用的时候,也可以考虑Condition。但是通常写底层库的人才不在意程序好不好写,在性能最优程序稍微麻烦和性能比价优秀间程序稍微简单,需要选择性能最优的方案。另外,还可以用Semaphore来控制并发量,比如提供服务给其他团队用的时候,用Semaphore控制对方调用的流速。
CyclicBarrier
Barrier叫做屏障,是一种非常重要的同步元语。 Java提供了多种屏障能力,比如CyclicBarrier让线程在一个屏障上等待,然后执行同步快,然后分开执行,然后进入下一个屏障。
图中,灰色部分代表屏障,有两个线程总是在屏障上等待;绿色的部分是同步点的程序,紫色和红色代表两个线程。当然也可以是3个、4个……线程。每个周期成为一个代(generation)。
举个例子:比如有1000W条订单数据需要进程处理/分析。 每个订单数据要获取关联的商品和发货单进行分析。
可以考虑每代CyclicBarrier处理1W条订单数据,开10个线程,每个获取1000个商品信息,再开10个线程,每个获取1000个发货单信息,然后利用CyclicBarrier进行一次同步计算。
我们先弄清一些基本概念,上面这个模型每个代处理10000条订单数据,需要1000代(generation)才能完成。总共要开20个线程,20个线程必须都完成才能执行同步块,我们称为屏障中有20个合作方(party)。
上面的方法中,批量处理数据(读写数据库商品、发货单数据)的一个优势是批量操作数据库速度更快;开多线程的优势是充分利用CPU资源。具体应该如何拆分批次,如何确定线程,要以实际场景经过尝试得到的真实数据为准。
下面程序演示CyclicBarrier的工作过程,我们假设有1000页的数据需要计算。每个CyclicBarrier的代(generation)我们用两个线程处理一页数据。然后在Barrier构造函数中写一个Runnable用于同步,每处理完一次我们在第8行打印一个当页处理完成。
代码中线程执行完自己的任务就调用CyclicBarrier.await
方法,让自己休眠,等待所有线程都调用了await
方法,CyclicBarrir就会进入下一个generation,这时候await方法后面的程序技术执行。
public class CyclicBarrierTest {
CyclicBarrier barrier;
int page = 0;
public CyclicBarrierTest(){
barrier = new CyclicBarrier(2, () -> {
System.out.println("Page " + page + " Done.");
page++;
});
}
void prepareProducts(){
while(page < 1000){
//
try {
Thread.sleep(20); // 模拟读取数据
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
void prepareDeliveryOrders(){
while(page < 1000) {
try {
Thread.sleep(10); // 模拟读取数据
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
void run(){
new Thread(this::prepareProducts).start();
new Thread(this::prepareDeliveryOrders).start();
}
public static void main(String[] argv) {
var test = new CyclicBarrierTest();
test.run();
}
}
相比单线程计算,上述方法可以帮助节省很多CPU、I/O资源,而且更快。
实现原理分析
CyclicBarrier内部有一个ReentrantLock用于保护计算逻辑,还有一个条件变量用于让线程们在条件变量中休眠。newCondition
创建的条件变量来自于AQS的一个内部类——ConditionObject。这个类的实现我这次没有深入讨论。条件变量的作用就是支持大量线程在一个条件变量上休眠,然后集体唤醒他们。
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
如果一个线程调用await
,可以这样实现:
lock.lock();
index = 还有多少个线程就进入下一代
if(index == 0) {
trip.notifyAll(); // 通知所有线程继续执行
} else {
trip.await(); // 让当前线程在条件变量下等待
}
lock.unlock();
条件变量可以用Semaphore
实现,也可以用队列,让休眠的线程记录在一个队列中,比如上节课讲的CLH算法。具体实现,大家可以参考AbstractQueuedSynchronizer的ConditionObject类。 也可以回顾下我们之前讲课用Semaphore实现生产者/消费者模型。
最后我们一起思考下CyclicBarrier解决了什么问题?CyclicBarrier提供了一套协作机制,解决的是多个线程间协作(也可以认为是通信),一起处理任务的问题。如果不提供CyclicBarrier我们可以用ReentrantLock+Condition实现类似的能力,但是CyclicBarrier覆盖的场景确实具有通用性,因此抽象成数据结构非常有价值。
CountDownLatch
CountDownLatch的逻辑简单一些,本质上也是一个Barrier,相当于只有一代CyclicBarrier。比如说实现一个返回首页数据的服务,需要请求多个的微服务——A、B、C、D的数据。这个时候可以用CountDownLatch来形成一个同步点。
下面程序模拟了我们描述的过程,我们抽象了一个叫做RPC的任务,相当于封装一个远程RPC请求。如果4个请求需要同步,那么我们可以用CountDownLatch。每个线程获取数据后,执行一次countDown
方法,这样主线程的latch.await
方法收集够4个线程的countDown之后,就会继续执行。
public class CountDownLatcherTest {
class RPC implements Runnable{
String url;
CountDownLatch latch;
public RPC(String url, CountDownLatch latch){
this.url = url;
this.latch = latch;
}
@Override
public void run() {
System.out.println("request");
latch.countDown();
}
}
public void run() throws InterruptedException {
var requests = new ArrayList<RPC>();
var latch = new CountDownLatch(4);
requests.add(new RPC("rpc://A", latch));
requests.add(new RPC("rpc://B", latch));
requests.add(new RPC("rpc://C", latch));
requests.add(new RPC("rpc://D", latch));
requests.forEach((x) -> new Thread(x).start());
latch.await();
// merge 4 request result
System.out.println("merge");
}
public static void main(String[] argv) throws InterruptedException {
var test = new CountDownLatcherTest();
test.run();
}
}
实现原理分析
CountDownLatch直接借用了AQS的能力。 AQS有一些控制共享进入临界区的方法。先设置AQS的状态为希望共享进入临界区的线程数量,再控制tryAcquireShared
方法的实现,每次有线程进入临界区就把状态减1
。
下面程序取自CountDownLatch中内部类Sync源代码,Sync继承于AQS。当tryAcquireShared
返回>0的值时,代表需要上锁。 tryReleaseShared
每次执行就把状态减少1,当状态=0时相当于锁住。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
而countDown
的程序如下,相当于每次把状态的值减少1。
public void countDown() {
sync.releaseShared(1);
}
最后我们思考下,CountDownLatch解决了什么问题?解决的还是多个线程间协作(通信)处理任务的问题,实现了一套屏障机制。 和CyclicBarrier相比,它更适合处理少量需要同时并发,并且需要同步结果的任务。
Phaser
Phaser提供的也是屏障能力,可以把Phaser理解成一种实现屏障能力的框架。 可以用来实现CyclicBarrier和CountDownLatch。后面我们学习TaskForkPoll的时候,也会用到Phaser去实现。
作为框架,最重要的就是思考清楚Barrier这个领域有哪些领域知识,下面我们来总结一下:
屏障(Barrier):合作的线程们在屏障上等待, 然后进入同步点
同步点(Synchronization Point),通过屏障后执行的同步代码
合作方数量(paties),就是互相等待的线程数量,只有当等待数量达到parties,才会进入同步点
到来(arrive),代表一个线程到达屏障,并等待,每次有线程到来,arrives + 1
到达数量(arrives),带到的线程数量
等待(wait),代表线程在barrier上等待
进步(advance),一个线程通过屏障,称为进步,代表工作有进度
开动/下一阶段(tripping/next pharse):到来的线程数量=parties,开始进入同步点
阶段(phase number):类似CyclicBarrier中的代,每次完成一次开动,phase number加1
Phaser除了抽象出了上面这些概念,还提供了一个更灵活的能力,让parties
可以随时变更。一个线程可以声明自己是一个合作方。
考虑对单个线程,Phaser可以从这几个方面思考:
- arrive (到达),在屏障上等待其他合作方, 到达线程数(arrives)增1
- register(注册),相当于声明自己是一个合作方,将
parties
增1 - waitAdvance(等待进步),在屏障上等待其他线程,数量够了就进入同步点
- deregister(注销),相当于注销自己,
parties
减1
以上操作有一些是放在一起用的,比如说arriveAndWaitAdvance
方法,相当于增加了phaser number,又同时等待进步。比如说arriveAndDeregister
相当于,到来,但是不等待,并注销自己。
有了上面的能力,等价于CyclicBarrier
的await
的是arriveAndWaitAdvance
。实现CyclicBarrier
的循环结构,需要在每个同步点再创建对应数量的任务。下面这段程序用Phaser模拟了CyclicBarrier的作用。 但是相比CyclicBarrirer,我不用提前设置总共有多少个线程。
public class PhaserTest {
Phaser phaser = new Phaser();
ExecutorService executorService = Executors.newCachedThreadPool();
class Worker implements Runnable{
@Override
public void run(){
phaser.register();
while(true) {
try {
Thread.sleep(1000);
System.out.println("I'm working! @" + phaser.getPhase());
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public void run() throws InterruptedException {
phaser.register();
executorService.execute(new Worker());
executorService.execute(new Worker());
executorService.execute(new Worker());
executorService.execute(new Worker());
while(true) {
phaser.arriveAndAwaitAdvance();
// Do some sync. point work;
System.out.println("Sync...." + phaser.getPhase());
}
}
public static void main(String[] argv) throws InterruptedException {
var test = new PhaserTest();
test.run();
}
}
Exchanger
Exchanger帮助我们在两个线程间交换数据。交换数据本身不是一个原子操作。比如交换a和b,需要一个临时变量t:
t = a;
a = b;
b = t;
上面的程序不是一条原子操作,因此线程间如果要交换数据,需要提供一个专门的能力。
什么时候需要交换数据呢?
比如:生产者读取了一个Buffer的数据,交给消费者处理。然后消费者处理完,再把Buffer传回给生产者。
下面是一段伪代码:
var exchanger = new Exchanger();
// Thread 1 : 生产者
初始化: writeBuffer = 空
while(true) {
writeDataBuffer(writeBuffer);
writeBuffer = exchanger.exchange(writeBuffer);
}
// Thread2: 消费者
初始化: readBuffer = 空
while(true) {
var readBuffer = exchanger.exchange(readBuffer);
var str = readFromBuffer(readBuffer);
}
消费者不断清空Buffer,然后调用exchanger
的Exchange方法把空Buffer给生产者。 生产者填满Buffer,然后把它给消费者。当生产者触发exchange方法时,线程会开始等待,直到消费者也同时触发。 通常这个类作用于两两线程之间处理交换数据,如果多个线程同时触发exchange将会触发不可控的随机行为。
上面这个例子中,允许一个线程在读取数据的同时,另一个线程在针对结果进行计算。 大家可以思考这个模式和CyclicBarrier比较,哪个更好?
本质上,这个方式就是实现了一个paties=2的CyclicBarrier,只不过交换数据的行为我们不需要再实现一个两个线程的公共变量,并用锁)
现在我们来思考下Exchanger解决了什么问题?解决了线程间交换数据的问题。交换是一个高效操作,比如说交换Buffer,节省了内存和计算消耗(比起两个Buffer)。
总结
最后请大家再思考一个问题,同步器解决了什么问题?
其实核心就是一个问题,同步器提供基于同步的线程间协作。 有交换数据,有互相等待,有控制并发量等等。
至此,同步器就告一段落。最底层的CAS,Monitor,到中间的AQS,再到介绍了6种同步器。我希望大家返回器思考下,通过同步器上、中、下3篇的学习,有没有理解XXX为什么这样设计这个核心的问题?这个XXX可能是synchronized,可能是AQS,可能是CountDownLatch,可能是CyclicBarrier。回答这类问题,第一大家要从他们的使用场景分析,第二大家要懂得他们的实现原理。而且一定要对比着看,多问自己A能不能用B替代,这样的问题。
下一节课,我们学习内存一致性模型和volatile。