概念
AQS 全称是AbstractQueuedSynchronizer
,翻译过来的意思就是抽象队列同步器,Java中的大部分同步类(Lock、Semaphore、ReentrantLock等)都是基于它实现的。
简单来说:AQS 是一个抽象类,为同步器提供了通用的 执行框架。有点类似 设计模式中的模板方法模式
它定义了 资源获取和释放的通用流程,而具体的资源获取逻辑则由具体同步器通过重写模板方法来实现。 因此,可以将 AQS 看作是同步器的 基础“底座”,而具体逻辑需要实现类来应用
核心思想
AQS 核心思想是,如果被请求的共享资源空闲,则将请求该资源线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁 实现的,即将暂时获取不到锁的线程加入到队列中。
CLH
针对自旋锁的问题,演进出一种基于队列的自旋锁 即 CLH,它适用于多处理器环境下的高并发场景
原理是通过维护一个隐式队列,它将争抢的线程组织成一个队列,通过排队的方式按序争抢锁。
每个线程不再 CAS 争抢一个变量,而是自旋判断排在它前面线程的状态,如果前面的线程状态为释放锁,那么后续的线程则抢锁。
因此,CLH 通过排队按序争抢解决了锁饥饿的问题。通过 CAS 自旋监听前面线程的状态避免的总线风暴问题的产生。
不过 CLH 还是有缺点的: 自旋期间线程会一直占用CPU 资源
注意! 隐式队列 指的是不同线程之间是没有真正通过指针连接的,仅仅是利用 AtomicReference + ThreadLocal实现了隐式关联
public class CLHLock {private static class CLHNode {volatile boolean isLocked = true; // 默认加锁状态}private final ThreadLocal<CLHNode> currentNode;private final ThreadLocal<CLHNode> predecessorNode;private final AtomicReference<CLHNode> tail;public CLHLock() {this.currentNode = ThreadLocal.withInitial(CLHNode::new);this.predecessorNode = new ThreadLocal<>();this.tail = new AtomicReference<>(new CLHNode());}public void lock() {CLHNode node = currentNode.get();CLHNode pred = tail.getAndSet(node);predecessorNode.set(pred);// 自旋等待前驱节点释放锁while (pred.isLocked) {}}public void unlock() {CLHNode node = currentNode.get();node.isLocked = false; // 释放锁currentNode.set(predecessorNode.get()); // 回收当前节点}
}
AQS优化CLH
AQS 在 CLH 锁的基础上进一步优化,形成了其内部的 CLH 队列变体。主要改进点有以下两方面:
1.自旋 + 阻塞: CLH 锁使用纯自旋方式等待锁的释放,但大量的自旋操作会占用过多的 CPU 资源。AQS 引入了 自旋CAS + 阻塞 的混合机制:
-
如果线程获取锁失败,会先短暂自旋尝试获取锁;
-
如果仍然失败,则线程会进入阻塞状态,等待被唤醒,从而减少 CPU 的浪费。
2.单向队列改为双向队列:
-
CLH 锁使用单向队列,节点只知道前驱节点的状态,而当某个节点释放锁时,需要通过队列唤醒后续节点。
-
AQS 将队列改为 双向队列,新增了
next
指针,使得节点不仅知道前驱节点,也可以直接唤醒后继节点,不用让后续结点自旋等待
且前面线程如果等待超时或者主动取消后,需要从队列中移除,且后面的线程需要“顶“上来
AQS 的核心机制
AQS 使用 int 成员变量 state
表示同步状态,通过内置的 FIFO 线程等待/等待队列 来完成获取资源线程的排队工作。
// 共享变量,使用volatile修饰保证线程可见性 private volatile int state;
另外,状态信息 state
可以通过 protected
类型的getState()
、setState()
和compareAndSetState()
进行操作。并且,这几个方法都是 final
修饰的,在子类中无法被重写。
//返回同步状态的当前值 protected final int getState() {return state; }// 设置同步状态的值 protected final void setState(int newState) {state = newState; } //如果当前状态值等于预期值(expect),则将其更新为新值(update) protected final boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
ReentrantLock对AQS 的使用
以可重入的互斥锁 ReentrantLock
为例,它的内部维护了一个 state
变量,用来表示锁的占用状态。
state
的初始值为 0,表示锁处于未锁定状态。
当线程 A 调用 lock()
方法时,会尝试通过 tryAcquire()
方法独占该锁,并让 state
的值加 1。
-
如果成功了,那么线程 A 就获取到了锁。
-
如果失败了,那么 就会通过
addWaiter()
方法将线程 A 封装为 Node 节点加入AQS
内部的队列中加入到一个等待队列(CLH 变体队列)中,直到其他线程释放该锁。-
通过
CAS
操作去更新tail
指针指向新入队的 Node 节点,CAS
可以保证只有一个线程会成功修改tail
指针,以此来保证 Node 节点入队时的并发安全。
-
-
入队后其实还有两步
-
首先尝试获取资源:如果当前结点的前继节点为
head
节点,则说明该结点是队头嘛,尝试获取资源成功,然后把它从等待队列移除 -
如果不是,那么当前节点的唤醒需要依赖于上一个节点。那么它的状态就是为
CANCELLED
,CANCELLED
状态的节点没有获取到锁,也就无法执行解锁操作对当前节点进行唤醒。因此在阻塞当前线程之前,需要跳过CANCELLED
状态的节点。-
如果发现前继节点的状态是
SIGNAL
,则可以阻塞当前线程。 -
如果发现前继节点的状态是
CANCELLED
,表示上一个节点取消获取锁,则需要跳过CANCELLED
状态的节点。 -
如果发现前继节点的状态不是
SIGNAL
和CANCELLED
,表明前继节点的状态处于正常等待资源的状态,因此将前继节点的状态设置为SIGNAL
,表明该前继节点需要对后续节点进行唤醒 -
当判断当前线程可以阻塞之后,通过调用
parkAndCheckInterrupt()
方法来阻塞当前线程。内部使用了LockSupport
来实现阻塞。LockSupoprt
底层是基于Unsafe
类来阻塞线程
-
-
假设线程 A 获取锁成功了,释放锁之前,A 线程自己是可以重复获取此锁的(state
会累加)。这就是可重入性的体现,但是,这也意味着,一个线程必须释放与获取的次数相同的锁,才能让 state
的值回到 0,也就是让锁恢复到未锁定状态。只有这样,其他等待的线程才能有机会获取该锁。
基于 AQS 可以实现自定义的同步器, AQS 提供了 5 个模板方法(模板方法模式)。
-
自定义的同步器继承
AbstractQueuedSynchronizer
。 -
重写 AQS 暴露的模板方法。自定义同步器时需要重写下面几个 AQS 提供的钩子方法:
//独占方式。尝试获取资源,成功则返回true,失败则返回false。 protected boolean tryAcquire(int) //独占方式。尝试释放资源,成功则返回true,失败则返回false。 protected boolean tryRelease(int) //共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 protected int tryAcquireShared(int) //共享方式。尝试释放资源,成功则返回true,失败则返回false。 protected boolean tryReleaseShared(int) //该线程是否正在独占资源。只有用到condition才需要去实现它。 protected boolean isHeldExclusively()
AQS资源共享方式
AQS 定义两种资源共享方式:
-
独占,只有一个线程能执行,如
ReentrantLock
) -
共享,多个线程可同时执行,如
Semaphore
/CountDownLatch
。
独占模式
AQS 中以独占模式获取资源的入口方法是 acquire()
,如下:
// AQS public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt(); }
在 acquire()
中,线程会先尝试获取共享资源;如果获取失败,会将线程封装为 Node 节点加入到 AQS 的等待队列中;加入队列之后,会让等待队列中的线程尝试获取资源,并且会对线程进行阻塞操作。分别对应以下三个方法:
-
tryAcquire()
:尝试获取锁(模板方法),AQS
不提供具体实现,由子类实现。 -
addWaiter()
:如果获取锁失败,会将当前线程封装为 Node 节点加入到 AQS 的 CLH 变体队列中等待获取锁。 -
acquireQueued()
:对线程进行阻塞、唤醒,并调用tryAcquire()
方法让队列中的线程尝试获取锁
源码分析
tryAcquire()
方法是 AQS 提供的模板方法,不提供默认实现。
因此,这里分析 tryAcquire()
方法时,以 ReentrantLock的非公平锁(独占锁)为例进行分析,ReentrantLock
内部实现的 tryAcquire()
会调用到下边的 nonfairTryAcquire()
:
// ReentrantLock final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();// 1、获取 AQS 中的 state 状态int c = getState();// 2、如果 state 为 0,证明锁没有被其他线程占用if (c == 0) {// 2.1、通过 CAS 对 state 进行更新if (compareAndSetState(0, acquires)) {// 2.2、如果 CAS 更新成功,就将锁的持有者设置为当前线程setExclusiveOwnerThread(current);return true;}}// 3、如果当前线程和锁的持有线程相同,说明发生了「锁的重入」else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");// 3.1、将锁的重入次数加 1setState(nextc);return true;}// 4、如果锁被其他线程占用,就返回 false,表示获取锁失败return false; }
主要通过两个核心操作去完成资源的获取:
-
通过
CAS
更新state
变量。state == 0
表示资源没有被占用。state > 0
表示资源被占用,此时state
表示重入次数。 -
通过
setExclusiveOwnerThread()
设置持有资源的线程。
如果线程更新 state
变量成功,就表明获取到了资源, 因此将持有资源的线程设置为当前线程即可。