Skip to content

同步器(中)——AbstractQueuedSynchronizer

面试官:给我讲讲AQS?

求职者:AQS是什么?

和CAS一样,AQS也是Java并发编程的一个基石。CAS提供底层的原子操作,AQS提供同步器的开发框架。但是对于绝大多数Java程序员,平时虽然用着基于AQS的各种工具,比如Semaphore、CyclicBarrier、CountDownLatch、ReentrantLock等等——却不知道AQS究竟是什么?多数工程师用不到的东西,有很重要,只要多一点点的好奇心,轻轻点一下代码追踪,多多少少就会看到AbstractQueuedSynchronizer这个类。因此,反而面试场景AQS常常出现。

这节课主要关联的面试题目有:

  1. 手写程序:如何用AbstractQueuedSynchronizer实现Mutex?
  2. 简述AbstractQueuedSynchronizer如何工作?
  3. AbstractQueuedSynchronizer可以用来做什么?
  4. AQS内部的CHL算法工作原理是什么?
  5. CAS在AQS中的作用是什么?
  6. AQS如何实现公平性?

什么是AQS?

下面我会带大家对之前学过的一些知识进行一个梳理,让我们看看为什么之前的工具还不够,Java需要提供AQS。

复习和思考

首先,我们发现并发环境下会产生竞争条件,有临界区需要保护。保护的方式有很多种,总体来说:

  • 互斥量(Mutex)对临界区上锁,只允许一个进程进入。比如synchronized关键字,相当于上锁和解锁。它的底层是Monitor对象,上锁和解锁会被实现成虚拟机指令monitorenter/monitorexit。这里根据不同的并发度,Java还采用了逐级加重锁的机制——从偏向锁、轻量级锁到重量级锁。 重量级锁就是操作系统在管理了。思考Java的线程谁在调度?
  • 另一个控制方法是信号量。信号量允许给定数量的线程进入临界区。互斥量可以看做一中特殊的信号量。

互斥量可以基于CAS+等待队列实现。Monitor中设置的EntrySet和WaitSet,本质就是等待队列。信号量也可以用类似的方案实现。

无论是实现互斥量还是信号量,我们都需要线程间的写作。让有的线程休眠,然后唤醒某些线程。这里有3种API:

  • Object.wait/Object.wait(要求获得Monitor锁)
  • ReentrantLock.newCondition创建出的Condition对象提供的await/signal。要求获得ReentrantLock的锁。
  • 不需要锁的底层API
    • LockSupport.park
    • LockSupport.unpark

如果用户需要实现线程的信号写作,比如让一些线程休眠,用户要么使用synchronized 关键字或者ReentrantLock。但是这样的方法太笨重,更轻量的是方案是使用cas 循环。可如果用户使用cas 循环,就需要使用Atomic类型,或者更底层的Unsafe类。 另外,如果用户想要在synchronizedReentrantlock 外休眠线程,就需要用LockSupport.park方法。

那么当用户想自己实现一些复杂的并发控制数据结构,是不是都需要用到UnSafe类和LockSupport类呢? —— 如果用户不想接触那么底层的程序,就可以用CAS实现自己想要实现的并发控制数据结构。

另一方面,大家可以回顾下之前我们写的生产者、消费者的例子。中间有大量上锁、解锁的操作,而实际上,我们完全可以考虑用CasLoop+UnSafe去实现。只不过这样实现会比利用已有的ReentrantLock更加复杂。但是如果是某些特殊的队列,我们可能真的有自己实现的需求。

再举个例子,比如说Semaphore的实现,我们可以考虑:

// 线程安全的queue
queue = {}

// state 初始值 ,最多允许3个线程通过
int state = 3

// 多线程并发控制的程序,Thread's
// 进入临界区前执行do...while
int localState
do {
   
    localState = state
  // invariant : localState >= 0
  // 不会发生更新为-1的操作
  if localState == 0 {
      queue.add(Thread.currentThread())
      LockSupport.park()
  }
} while( !UnSafe.cas(&state, localState, localState - 1) )


// 临界区程序

// 退出临界区
while( !UnSafe.cas(&state, state, state+1) ){     
}
LockSupport.unpark( queue.remove())

如果多个线程同时执行do-while处的程序,那么最多同时只有3个线程可以执行临界区程序。我们使用了队列,是因为在并发非常高的时候,假设临界区需要执行10ms,那么这个时候就不可以让这些线程去争夺10ms的临界区了(会消耗大量的CPU资源),我们需要考虑增加队列。 这个模型我们在Monitor中出现非常多了,如果你不明白,可以翻一下Monitor关联的那讲。

如果让用户实现从自旋竞争升级到操作系统休眠线程的逻辑,然后再让用户实现并发控制的链表用于模拟FIFO的队列。——这样对用户的要求太高,心智负担太重。——因此我们需要给用户更加简单的实现方式,给他们提供一个同步器开发框架,也就是AQS。

最后,再举一个死锁的例子,比如说小明和小红鞠躬用程序表达,双方主动向对方鞠躬,看到对方鞠躬后进行回礼,类似:

class Deadlock {

    static class Friend {
        private final String name;
        private final ReentrantLock lock = new ReentrantLock();

        public Friend(String name) {
            this.name = name;
        }

        public String getName() {
            return this.name;
        }

        public void bow(Friend bower) {
            lock.lock();
            System.out.format("%s: %s 向我鞠躬!%n", this.name, bower.getName());
            bower.bowBack(this);
            lock.unlock();
        }
        public void bowBack(Friend bower) {
            lock.lock();
            System.out.format("%s: %s"
                            + " 回了我!%n",
                    this.name, bower.getName());
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        final Friend xiaoming =
                new Friend("小明");
        final Friend xiaohong =
                new Friend("小红");
        new Thread(() -> xiaoming.bow(xiaohong)).start();
        new Thread(() -> xiaohong.bow(xiaoming)).start();
    }
}

上面的程序小红和小明都先拿到自己Friend对象的锁,因此结果会打印出:

小明: 小红 向我鞠躬!
小红: 小明 向我鞠躬!

但是,再bow 方法的过程中,小明、小红都试图在bowBack 方法中拿对方对象的锁。这样互相等待锁,就会造成死锁。

AQS

AQS是一个Java的同步器开发框架。 用AQS开发的同步器比如说ReentrantLock区别于synchronized ,是纯Java的实现。synchronized 也被称为内部锁(build-in lock或者intrinsic lock), 是C/C++实现的底层的Monitor。而AQS提供完整基于Java的实现,且用户不需要使用底层API,比如说Unsafe和LockSuppot。

image-20201210172234013

AQS的能力

  • 提供同步元语(锁、解锁、信号量限制等)
  • 提供高性能生产者消费者结构
  • 提供Non-Blocking能力(cas\tryLock等)
  • 提供定时能力
  • 提供中断能力
  • 提供线程间写作(Condition)
  • 提供扩展数据结构的能力

做为一个底层的通用框架,AQS的能力围绕两个因子展开。

  1. AQS内部实现了一个整数状态(state),然后围绕这个状态提供了cas 操作。用户只需要提供关键的cas 状态操作步骤,AQS实现cas loop 。(具体见后续Mutex的代码示例)
  2. AQS内部实现了两个高性能的队列
    1. 一个用于获取锁、信号量等线程失败后进行等待等待
    2. 另一个用于线程们在某个条件变量上等待

AQS的性能层面考虑

image-20201210193600895

性能层面AQS使用高性能队列优化线程争夺自旋锁的场景。类似synchronized 关键字中从轻量级锁到重量级锁升级的逻辑。

功能层面

从功能上说,AQS要提供类似synchronized 关键字提供的功能。下图是我对AQS能力的一个简单整理:

image-20201210112725849

上图中获取锁的能力分成了阻塞版本(acquire),和非阻塞版本(tryAcquire);释放锁是release 。然后还需要3个处理突发事件的能力:

  1. 超时(阻塞足够的时间可以产生一个异常,也就是一个中断,中断当前执行的程序,并且释放资源)
  2. 取消(主动触发取消正在执行的任务)
  3. 中断(允许其他线程中断当前执行的任务,实现抢占)

这3个能力对后面我们要学习的解决饥饿问题(死锁、活锁)非常有帮助;而且可以帮助用户针对业务场景实现自己的同步逻辑。

AQS还需要提供重入的能力,就是允许用户锁多次。

除了上面提到的6个能力外,AQS还需要提供线程间同步的能力,就是notify/wait。

性能层面

性能层面,AQS和synchronized 一样,内部需要实现等待队列。先让线程通过自旋锁进入一个等待队列,这样可以节省大量自旋的时间。大多数情况下,自旋进入等待队列,比自旋锁获取锁临界区容易太多。思考下面这个逻辑:

acquire(&lock);
// 10ms的临界区操作
release(&lock);

如果有10000个线程直接自旋竞争上面的锁,每10ms才有1个线程进入临界区。 如果是10个CPU,在1s内理论极限只有1000个线程能够拿到锁,其他线程都需要多次自旋,然后休眠。如果使用等待队列,10000个线程可以快速休眠,然后再从等待队列中拿出线程去进入临界区。这样节省了CPU资源。

大家可以对比下图中的两种方式:

在并发量大,且临界区时间开销较大时(10ms),方式1对CPU压力更大,因为多数线程需要执行自旋锁多次。

另一方面,如果并发量小、临界区时间开销较小呢?这种情况下两种方法开销差距不大,因为并发小,多数线程可以直接通过自旋锁,快速进入临界区。

大家可还以思考:如果每个线程只自旋一次,拿不到锁马上休眠可不可以呢?这种情况在并发量小、临界区执行时间短的情况,是不好的——自旋锁在并发量少的时候很好用。在并发量很大时,这个方式是可以的。

AQS选择的是上面的方案2,这不是在所有场景最优的方案,但是是在大多数情况下,都表现比较较好的一个方案。

AQS的工作原理

我这里用一段伪代码来描述AQS加锁的工作原理:

void acquire() {
  while( 尝试获取锁,如果失败 ) {    
    如果队列中没有当前线程:queue.add( currentThread ); // 加入队列
    park(); // 休眠线程    
  }
}

AQS是一个框架,尝试获取锁的逻辑由用户提供。AQS会在用户提供获取锁的程序基础上进行自旋,如果用户的程序返回true ,那么就进入临界区。

竞争被AQS分成两个阶段:

  1. while部分进行自旋,如果在这个部分拿到锁,类似插队
  2. 竞争的失败者在队列中休眠(park),直到有竞争成功者退出(unpark)

这里的队列采用CLH实现的高性能队列,是一个利用cas实现的无锁编程结构。具体这个队列的实现我们后面会讲。

实际用户Coding的时候,以继承AQS为主要手段,需要自己重写tryAcquire 方法用于实现获取锁的计算逻辑。重写tryRelease方法实现释放锁的计算逻辑。

// AQS内部源码简化
void acquire() {
  while( !tryAcquire() ) { 
    如果队列中没有当前线程:queue.add( currentThread ); // 加入队列
    park(); // 休眠线程    
  }
}

// 用户实现继承类重写这个方法
@Override
boolean tryAcquire() {
  CAS获得锁的逻辑
  return 获得锁是否成功
}

下面我实际带大家写一个用CAS实现Mutex的程序,帮助大家理解:

public class Mutex {
    private final Sync sync = new Sync();
    static class Sync extends AbstractQueuedSynchronizer{
        protected boolean tryAcquire(int arg) {
            return compareAndSetState(0, 1);
        }
        protected boolean tryRelease(int arg) {
            return compareAndSetState(1, 0);
        }
    }
    public void lock(){
        sync.acquire(0);
    }
    public void unlock(){
        sync.release(0);
    }

    static int i;
    public static void main(String[] argv) throws InterruptedException {
        var mutex = new Mutex();
        var t1 = new Thread(() -> {
            for(int j = 0; j < 10000; j++) {
                mutex.lock();
                i++;
                mutex.unlock();
            }
        });
        var t2 = new Thread(() -> {
            for(int j = 0; j < 10000; j++) {
                mutex.lock();
                i++;
                mutex.unlock();
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();

        System.out.println("i=" + i);


    }
}

你可以看到实际的代码集中在子类Sync 中。

static class Sync extends AbstractQueuedSynchronizer{
    protected boolean tryAcquire(int arg) {
        return compareAndSetState(0, 1);
    }
    protected boolean tryRelease(int arg) {
        return compareAndSetState(1, 0);
    }
}

相当于只需要重写tryAcquire 方法,Mutex的tryAcquire 想要用CAS将状态从0设置为1——相当于抢占锁。 这个Mutex设计不可重入。state 是AQS内部的一个整数变量,初始值是0。lock 实际执行的程序是AQS的acquire 方法,AQS的acquire 方法内部会调用tryAcquire 方法。

那么如果想要实现重入,如何修改呢? 我们思考下重入的逻辑。

acquire(&lock) // 1
acquire(&lock) // 2
release(&lock)  // 1
release(&lock)  // 0

重入其实就是允许acquire多次,每次acquire 锁自增1,release 的时候减 1。如果对象已经上锁,就不需要次获得锁,直接将锁+1。在release 的时候,先将锁减一,如果锁为0,就相当于交出了锁。下面程序摘自ReentrantLock ,可以看到tryAcquire 如果发现state 不是0,就不继续获取锁了。

protected final boolean tryAcquire(int acquires) {
    if (getState() == 0 && compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

 final boolean initialTryLock() {
    Thread current = Thread.currentThread();
    if (compareAndSetState(0, 1)) { // first attempt is unguarded
        setExclusiveOwnerThread(current);
        return true;
    } else if (getExclusiveOwnerThread() == current) {
        int c = getState() + 1;
        if (c < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(c);
        return true;
    } else
    return false;
}

final void lock() {
    if (!initialTryLock())
        acquire(1);
}

而每次lock 会先判断当前线程是不是已经拥有锁,如果已经拥有,就增加state 的值,但是不再获取锁。

实现Semaphore的例子

class SemaphoreExample {

    static class Semaphore extends AbstractQueuedSynchronizer{

        public Semaphore(int permits) {
            setState(permits);
        }

        @Override
        protected int tryAcquireShared(int arg) {
            var available = getState();
            var left = available - 1;
            if(available == 0) {
                return -1;
            }
            if(compareAndSetState(available, left)){
                return left;
            }
            return -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            var available = getState();
            return compareAndSetState(available, available + 1);
        }
    }

    public static int i = 0;

    public static void main(String[] args) throws InterruptedException {


        var semaphore = new Semaphore(3);
        for(int i = 0; i < 1000; i++) {
            new Thread(() -> {
                semaphore.acquireShared(0);
                try {
                    System.out.println("go");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                semaphore.releaseShared(0);
            }).start();
        }
    }
}

AQS队列(CLH)

在一些优秀岗位面试的时候,有可能会讨论到AQS内部队列的实现,因为这是一个非常经典的Lock-Free数据结构。有关Lock-Free,后面我们会有专门的小节讨论。

当AQS的一个线程发生阻塞时,需要将线程入队。当需要从队列中选择一个线程执行时,会把最早进入的元素出队。

为了应对高并发场景,AQS的队列采用了CLH队列。CLH是Craig、Landin和Hagersten,该算法的3位作者。

思考:普通的链表怎么实现队列?

为了弄清楚CLH的原理,大家可以先思考如果是链表我们如何实现插入、删除(入队,出队)操作。

入队(插入),我们可以选择头插法:

代码表示头插法:

void insert(Node newNode){
  newNode.next = head;
  head = newNode;  
}

如果考虑到竞争条件,需要用cas 更新head:

void insert(Node newNode){  
  newNode.next = head
  while(!cas(&head, head, newNode)){}
}

看来插入问题不大,但是如何出队呢? 我们这时需要拿到Tail的前一个元素。而Tail到它的前一个元素,没有指针。聪明的朋友,马上会想到:是不是应该使用双向队列?但是双向队列没有可以直接维护节点原子性的cas 操作,因此双向队列需要进行多个原子操作,容易死锁(死锁问题,我们会在后面的课讨论)。那么用数组实现队列行不行呢?——数组容量默认设置多大呢?容量不够如何扩容呢? 这些都是麻烦的问题。

因此,我们需要一种单向链表,能够解决Tail没有向前指针的问题?那么如何做呢?

CLH的解决方案

CLH还是采用单向链表实现队列,只不过多了一个哑巴节点。

插入(入队)采用在尾部插节点。

删除(出队)时非常方便, 只需要操作头指针指向要出队的线程。

有朋友可能会问,线程没有回收啊? 没删除这个节点啊? ——这个没关系,CHL认为Head总是指向哑巴节点,这样虽然没有真的删除掉线程,但是线程已经变成新的哑巴节点了。

AQS的公平性

如果每个线程到来,先排队,然后按照先入先出的原则,实际上实现了一种朴素的公平性。 所谓,“公平”是相对于衡量标准的。没有绝对的公平。 通常在竞争过程中,能做到先到先得,那么认为是公平的。如果有线程可以后到先得,那么就破坏了公平性。

AQS作为一个框架,既要支持公平性,也要支持不公平性。 而ReentrantLock作为AQS的一种实现,本身也提供了配置项,配置锁是不是公平的。

ReentrantLock内部继承关系

ReentrantLock内部有一个静态类Sync继承于AQS,实现了大部分重要的功能。 为了实现公平锁和非公平锁,Sync扩展了两个子类FairSync和UnFairSync。我们来对比下实现:

// 非公平锁
protected final boolean tryAcquire(int acquires) {
    if (getState() == 0 && compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}
    
// 公平锁
protected final boolean tryAcquire(int acquires) {
    if (getState() == 0 && !hasQueuedPredecessors() &&
        compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

上面代码大同小异,区别在于公平锁在抢占时,如果队列中已经有元素,就不再用自旋抢占。非公平锁,新线程只要执行tryAcquire 就会尝试抢占(插队)。允许新的线程插队(自旋),可以提高性能。比如在一个线程工作完成的同时,有另一个线程尝试获取锁,把锁直接给这个线程,可以节省一次线程切换的时间。(如果要从队列中取一个出来,恢复执行,要进行Context-Switch)。

总结

本节我们深度解读了AbstractQueuedSynchronizer。我希望今天每一个学了这节课的朋友,深度反思一下,自己平时做的程序设计有没有思考把一些相似功能抽象成一个基础框架。学有余力的朋友可以读一下AbstractQueuedSynchronizer的源码,如果读起来吃力,就在脑海中把我讲的再思考一遍。特别是,要反复问自己为什么要这样设计?

另外,就是希望大家可以理解为什么要自旋,这个概念已经反复出现太多次了。 自旋本质,是用多消耗CPU换取时间。在并发量小的时候,自旋有优势。并发量大,就是资源的浪费。软件设计,很多时候就是和不确定性在斗争。AQS和synchronizer性能上,不一定有优势,最大的优势在于所有线程都在Java端实现,这样让我们分析性能和执行过程时有抓手,提高我们程序的可预测性。

文章来源于自己总结和网络转载,内容如有任何问题,请大佬斧正!联系我