一、锁的分类
1、可重入锁、不可重入锁
重入:
某个线程在获取到一个锁时还可以再次获取这把锁。
可重入锁:
1、当前线程在获取到A锁的情况下,如果再次尝试获取A锁,也是被允许获取到的。
2、synchronized、ReentrantLock、ReentrantReadWriteLock都是可重入锁。
不可重入锁:
1、当前线程在获取到A锁的情况下,如果再次尝试获取A锁,也是被允许获取到的。
2、在Java中,Worker类实现了不可重入锁。
2、乐观锁、悲观锁
乐观锁:
Java中提供的CAS操作就是一种乐观锁的实现。
获取不到锁资源时,也可以再次让CPU调度,重新尝试获取锁资源。
悲观锁:
在获取不到锁资源时,会将当前线程挂起,进入BLOCKED或WAITING状态。在进入阻塞或者等待状态时,会涉及到用户态和内核态的切换,这种切换是比较消耗资源的。
用户态: JVM可以自行执行的指令。
内核态: JVM不可以自行执行的指令,需要调用操作系统才能执行。
3、公平锁、非公平锁
Java中提供的synchronized是一个非公平锁。
ReentrantLock、ReentrantReadWriteLock可以实现公平锁和非公平锁。
公平锁: 非常公平
线程A获取到锁资源后,线程B没获取到就去排队。线程C进来时看到线程A持有锁、线程B在排队,那么线程C就排到B的后面,等到B拿到锁或者取消排队,线程C尝试竞争锁资源。
非公平锁: 不是非常不公平
线程A获取到锁资源后,线程B没获取到就去排队。线程C进来时先尝试获取锁。
1、获取到锁了:线程C先执行。
2、没获取到锁:线程C排到B后面,等待B拿到锁资源或者放弃竞争锁,再次尝试获取锁资源。
4、互斥锁、共享锁
synchronized、ReentrantLock是互斥锁。
ReentrantReadWriteLock中有互斥锁也有共享锁。
互斥锁:
同一时间点,只会有一个线程持有当前互斥锁,其他线程需要等待前面的线程释放锁才能持有。
共享锁:
同一时间点,一个共享锁可以由多个线程共同持有。
二、深入synchronized
1、类锁、对象锁
类锁:使用synchronized给static修饰的方法加锁,锁的就是这个方法所属的类。
对象锁:使用synchronized给普通方法加锁,锁的就是这个方法所属类的对象。
2、synchronized的优化
在JDK1.5中,Doug Lee发布了ReentrantLock,lock的性能远高于synchronized,所以JDK团队就在JDK1.6中,升级了synchronized,做了大量优化。
1、锁消除:
在synchronized修饰的代码块中,如果没有对任何共享资源做操作,就会触发锁消除,即使写了synchronized,也不会加锁。
2、锁膨胀(锁扩张):
如果在一个循环中,频繁地获取锁和释放锁,这样的消耗很大。
锁膨胀就是将锁的范围扩大,避免频繁切换导致不必要的资源消耗(将锁膨胀到for循环外面)。
3、锁升级:
ReentrantLock的实现是基于乐观锁的CAS,先尝试获取资源,实在拿不到才进入挂起状态。
synchronized在JDK1.6之前,只要获取不到锁,就立即进入挂起状态,所以synchronized性能略差。JDK1.6加入了锁升级:
- 无锁(匿名偏向锁):当前对象没有作为锁使用。
- 偏向锁:如果当前锁资源只有一个线程在频繁使用,那么当这个线程过来时,只需要判断指向当前锁的线程是否为自己。
- 如果是自己,就直接获取锁。
- 如果不是,就基于CAS的方式尝试获取偏向锁。如果获取不到锁就升级为轻量级锁。(偏向锁出现了锁竞争的情况)
- 轻量级锁:线程会采用自旋的方式尝试获取锁(自适应自旋次数)。
- 如果获取到锁资源就操作。
- 如果自旋了一定的次数后还拿不到,就升级为重量级锁。
- 重量级锁:就是最传统的synchronized锁,拿不到锁资源就挂起当前线程(用户态&内核态的转换)。
3、synchronized实现原理
是基于对象实现的,每一个java对象在new出来之后会存放到堆内存中,其中这个对象有一个属性叫MarkWord,保存着与锁相关的信息。


MarkWord中通过2~3个比特位标记了四种锁的状态:无锁、偏向锁、轻量级锁(自旋锁)、重量级锁。
4、synchronized的锁升级过程
① 无锁状态
package allwe.lock;import org.openjdk.jol.info.ClassLayout;public class ReadLockStatusTest {public static void main(String[] args) {Object o = new Object();System.out.println(ClassLayout.parseInstance(o).toPrintable());}
}
② 匿名偏向锁(无锁)
锁在默认情况下,开启了偏向锁延迟。
1、偏向锁在升级为轻量级锁的时候,会涉及到偏向锁撤销,需要等到一个安全点(STW)才可以做偏向锁撤销。所以在检测到并发出现时,就可以选择不开启偏向锁,或者是设置偏向锁延迟开启。
2、在JVM启动初期,由于需要加载大量的.class文件到内存中,这个操作会涉及到synchronized的使用,为了避免出现偏向锁撤销,所以JVM做了一个延迟5s开启偏向锁的操作。
3、如果正常开启了偏向锁,那么就不会出现无锁状态,所以此处等待5s后对象的锁状态更改为匿名偏向锁。
package allwe.lock;import org.openjdk.jol.info.ClassLayout;public class ReadLockStatusTest {public static void main(String[] args) throws InterruptedException {Thread.sleep(5000);Object o = new Object();System.out.println(ClassLayout.parseInstance(o).toPrintable());}
}
③ 偏向锁
在出现加锁的情况时,synchronized会判断获取锁的线程是否已经拥有锁资源,如果已经拥有就会将锁升级为偏向锁,指向当前线程。
package allwe.lock;import org.openjdk.jol.info.ClassLayout;public class ReadLockStatusTest {public static void main(String[] args) throws InterruptedException {Thread.sleep(5000);Object o = new Object();synchronized (o) {System.out.println("==============main=============");System.out.println(ClassLayout.parseInstance(o).toPrintable());}}
}
④ 轻量级锁 & 重量级锁
在出现锁竞争的情况时,synchronized会自动将偏向锁升级为轻量级锁,自旋一定次数后还未获取到锁资源就再次升级为重量级锁。
package allwe.lock;import org.openjdk.jol.info.ClassLayout;public class ReadLockStatusTest {public static void main(String[] args) throws InterruptedException {Thread.sleep(5000);Object o = new Object();System.out.println(ClassLayout.parseInstance(o).toPrintable());Thread thread = new Thread(() -> {synchronized (o) {System.out.println("=============t1=============");System.out.println(ClassLayout.parseInstance(o).toPrintable());}});thread.start();synchronized (o) {System.out.println("==============main=============");System.out.println(ClassLayout.parseInstance(o).toPrintable());}}
}
⑤ synchronized的锁升级过程
⑥ 线程中的Lock Record
为什么在偏向锁和轻量级锁的时候,锁指向了线程中Lock Record的指针,并且Mark Word中什么也没记录?
1、在锁升级为偏向锁和轻量级锁的时候,对象的锁指向了某个线程的栈中的Lock Record,这个Lock Record属于哪个线程,就说明哪个线程获取到了锁。
2、锁指向的Lock Record中记录了对象的头信息,也就是锁的Mark Word。
3、当锁再次升级为重量级锁时,就将锁的信息存储到了C++的ObjectMonitor中。
5、重量级锁底层 - ObjectMonitor
存储Mark Word和排队的线程等信息,是C++源码。
三、深入ReentrantLock
1、ReentrantLock 和 synchronized 的区别
1、本质有区别。
都是JVM层面的互斥锁,一个是类,一个是关键字。
2、效率有区别。
如果竞争比较激烈,推荐ReentrantLock,不存在锁升级的操作。如果使用synchronized,一旦升级到重量级锁,不会出现锁降级。
3、实现原理有区别。
ReentrantLock是基于AQS实现的,synchronized是基于ObjectMonitor实现的。
4、功能有区别。
ReentrantLock的功能更全面,支持公平锁、可以指定等待的时间,这些都是synchronized没有的。
2、AQS概述
AQS就是AbstractQueuedSynchronizer类,是JUC包下的一个基类。JUC包下的很多工具都是基于AQS实现了部分功能。比如ReentrantLock、ThreadPoolExecutor、BlockingQueue、CountDownLatch、Semaphore、CyClicBarrier等。
AQS提供了一个由volatile修饰,并且使用CAS方式修改的int类型state变量。
AQS中维护了一个双向链表,由head、tail,每个节点都是Node对象。


3、加锁流程概述
流程基本分为三步,先做CAS获取锁资源,实在获取不到就将当前线程封装为Node节点,再放到双向链表中排队。
非公平锁:一进来先做CAS。
公平锁:一进来时发现锁被占有,直接进入等待链表中。
4、lock方法加锁源码分析
① lock 方法解析
// 非公平锁
final void lock() {// 一进来就做CAS操作,想将state从0修改为1if (compareAndSetState(0, 1))// 如果成功了就将获取锁的线程设置为自己setExclusiveOwnerThread(Thread.currentThread());else// 进入排队抢锁操作acquire(1);
}// 公平锁
final void lock() {// 直接进入排队抢锁操作acquire(1);
}
② acquire 方法解析
// acquire 方法解析
public final void acquire(int arg) {// tryAcquire:再次查看当前线程是否可以尝试获取锁资源if (!tryAcquire(arg) &&// addWaiter:将当前线程封装为Node节点,插入AQS的双向链表的末尾// acquireQueued:查看当前线程是否为双向链表的第一个节点// 如果是:再次尝试获取锁资源,如果长时间拿不到,就挂起线程// 如果不是:直接挂起线程acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// selfInterrupt:中断线程selfInterrupt();
}
③ tryAcquire 方法解析
// 非公平锁 - tryAcquire 方法解析
final boolean nonfairTryAcquire(int acquires) {// 获取当前线程final Thread current = Thread.currentThread();// 获取当前锁资源的state值int c = getState();// 为了预防上一个占有锁的线程离开,再次检查锁占有情况if (c == 0) {// 尝试使用CAS加锁if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 如果当前持有锁的线程与当前线程相同,那么就是锁重入else if (current == getExclusiveOwnerThread()) {// 给state + 1int nextc = c + acquires;// 判断重入次数是否超出int的最大值if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");// 给state + 1setState(nextc);// 加锁成功return true;}//加锁失败return false;
}// 公平锁 - tryAcquire
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// hasQueuedPredecessors:检查AQS的双向队列if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}// 检查AQS的双向队列中是否为空,如果不为空是否自己排第一
public final boolean hasQueuedPredecessors() {Node t = tail;Node h = head;Node s;// h != t:检查是否为空return h != t &&// 检查是否当前线程排第一((s = h.next) == null || s.thread != Thread.currentThread());
}
④ addWaiter 解析
// addWaiter 解析,以互斥锁的形式,将Node节点放入AQS的双向队列中
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 如果第一次尝试将Node节点放入队列失败,就进入enq方法enq(node);return node;
}// 使用死循环将Node节点放到AQS的双向队列末尾
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // 如果AQS的双向队列为空,就创建一个没意义的Node节点放到最前面if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}
⑤ acquireQueued 方法解析
当前线程再次尝试获取锁资源,如果没有资格获取,或者获取锁资源失败了,就挂起当前线程。
// acquireQueued
// 在当前线程的Node节点进入AQS的双向队列之后,再次尝试获取锁,获取不到就挂起当前线程
final boolean acquireQueued(final Node node, int arg) {// 标识当前线程是否获取锁失败boolean failed = true;try {// 这里先不考虑中断操作boolean interrupted = false;// 1、进入死循环// 2、线程被唤醒后继续执行循环for (;;) {// 当前Node节点的前一个节点final Node p = node.predecessor();// 如果当前节点排第一名,就尝试获取锁资源// 获取成功就return// 获取失败就进入下一个if块,挂起线程if (p == head && tryAcquire(arg)) {// 将当前Node节点设置为头节点,并且转换为【伪节点】setHead(node);// 通知GC回收【老的头节点】p.next = null; // help GCfailed = false;return interrupted;}// shouldParkAfterFailedAcquire:// 第一次进入方法后,找到【在我前面、离我最近、状态正常】的节点// 通知前一个【正常】的节点我要挂起了if (shouldParkAfterFailedAcquire(p, node) &&// parkAndCheckInterrupt:挂起线程,等到被唤醒后继续从此处执行parkAndCheckInterrupt())// 这里先不考虑中断操作interrupted = true;}} finally {// lock方法永远不会获取锁失败,会一直挂起等待,知道获取锁成功// 除非JVM异常if (failed)// 这里先不看cancelAcquire(node);}
}// 寻找前一个正常的节点,并且通知它我要挂起了
// SIGNAL(-1):表示已经知道下一个Node节点需要唤醒
// CANCELLED(1):退出状态,表示该节点不参与抢锁了
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 获取前一个节点的状态int ws = pred.waitStatus;// 如果前一个节点已经知道我要挂起了if (ws == Node.SIGNAL)// 可以挂起return true;// 如果前一个节点异常了,或者退出操作了// 只有一个状态值大于0,就是退出状态if (ws > 0) {// 找到【在我前面、离我最近、状态正常】的节点do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);// 我链接到这个正常节点// 通知GC回收中间退出的Node节点pred.next = node;} else {// 通知前一个节点:我要挂起了// 修改前一个节点的状态为-1compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}// 返回false,准备下次再进来return false;
}
5、tryLock方法加锁源码分析
tryLock无参方法解析
// tryLock 方法源码解析
public boolean tryLock() {return sync.nonfairTryAcquire(1);
}// 以非公平锁的方式尝试获取锁 - 不管成功与否,仅尝试一次
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();// 检查锁的占有情况if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 重入计数+1else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
tryLock有参方法解析
// tryLock带参数 - 源码解析
public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {// 将时间参数转换成纳秒进行操作return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}// tryAcquireNanos
public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {// 判断线程是否被中断了,如果中断就抛出异常if (Thread.interrupted())throw new InterruptedException();// 尝试使用CAS获取锁return tryAcquire(arg) ||// 如果CAS没获取到锁,就进入双向队列排队doAcquireNanos(arg, nanosTimeout);
}// 以自动过期的形式进入双向队列排队
private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {// 如果设置的过期时间没超过0纳秒,证明无需等待锁if (nanosTimeout <= 0L)return false;// 计算等待的最终时间final long deadline = System.nanoTime() + nanosTimeout;// 将当前线程封装为Node放入双向队列中final Node node = addWaiter(Node.EXCLUSIVE);// 标识当前线程获取锁还未成功boolean failed = true;try {for (;;) {// 如果当前Node排第一,继续尝试获取锁final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return true;}// 判断是否已经到达过期时间nanosTimeout = deadline - System.nanoTime();if (nanosTimeout <= 0L)return false;// 通知前一个Node节点我要挂起了if (shouldParkAfterFailedAcquire(p, node) &&// 如果剩余抢锁时间不足1000纳秒,就无需进入等待 - 时间太短了nanosTimeout > spinForTimeoutThreshold)// 线程进入等待状态,到时间后自动唤醒LockSupport.parkNanos(this, nanosTimeout);// 如果线程醒了,判断是中断唤醒的,还是自动醒来的if (Thread.interrupted())// 如果是中断唤醒的就抛异常throw new InterruptedException();}} finally {// 如果获取锁失败if (failed)// 取消排队cancelAcquire(node);}
}
取消节点整体操作流程:
1、线程设置为null。
2、向前找到有效节点作为当前节点的prev
3、将waitStatus设置为1,代表取消排队。
4、脱离整个AQS队列:分三种情况
① 当前Node是tail
② 当前Node排第一
③ 当前Node不排第一也不是tail
// 如果当前线程被中断 - 那么当前线程的Node节点取消AQS排队
private void cancelAcquire(Node node) {// 如果当前线程的Node节点已被回收,就无需移出AQS队列if (node == null)return;// 将Node节点的线程设置为nullnode.thread = null;// 拿到当前节点的前置节点Node pred = node.prev;// 找到【在我前面、离我最近、状态正常】的节点while (pred.waitStatus > 0)node.prev = pred = pred.prev;// 目标前置节点的原始后置节点Node predNext = pred.next;// 将Node节点的等待状态设置为1 - 表示取消排队获取锁node.waitStatus = Node.CANCELLED;// 如果当前节点是tail节点,就将上一个有效节点修改为tail节点,如果修改失败就什么都不做// 如果是tail节点就直接移除if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);} else {// 移除tail节点失败或不是tail节点// 准备一个参数接收新的上一个节点的状态int ws;// 如果上一个节点是头节点,直接唤醒后面的节点if (pred != head &&// 如果本节点不排第一也不排最后// 拿到上一个节点的状态,并且修改为只要状态不是取消状态就修改为-1,通知它需要唤醒后面的节点((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&// 如果上一个节点的线程不是nullpred.thread != null) {// 拿到本节点的下一个节点Node next = node.next;if (next != null && next.waitStatus <= 0)// 如果下一个节点状态正常就让上一个节点直接指向下一个节点,从而跳过当前节点compareAndSetNext(pred, predNext, next);} else {// 如果当前Node排第一// 唤醒后面的节点unparkSuccessor(node);}// 回收当前Nodenode.next = node; // help GC}
}
6、lockInterruptibly方法加锁源码分析
使用这个方法获取锁时,不会自动退出排队,而是一直等待获取锁资源,直到拿到锁或者线程被中断。
// 这个方法是与tryLock(time,unit)方法唯一的区别
// 拿不到锁资源就一直等待,直到别人释放锁资源后被唤醒,或者被中断唤醒。
private void doAcquireInterruptibly(int arg) throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())// 如果是被中断唤醒就抛异常throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}// 进入等待状态并且校验是否中断
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);// 如果是被中断唤醒就抛出异常return Thread.interrupted();
}
7、释放锁源码解析
① 释放锁流程概述
② 释放锁源码解析
// 释放锁源码解析
public void unlock() {sync.release(1);
}// 实现释放锁资源操作 - 由于是可重入锁,需要完全释放才返回true,否则表示继续持有锁
public final boolean release(int arg) {// 处理state的值,将state的值减去1if (tryRelease(arg)) {// 如果完全释放锁了,state == 0时// 尝试唤醒AQS队列的下一个线程Node h = head;// 如果AQS队列不为空,且头节点的状态是需要唤醒下一个节点的线程if (h != null && h.waitStatus != 0)// 唤醒下一个线程unparkSuccessor(h);return true;}return false;
}// 处理state的值,将state的值减1
protected final boolean tryRelease(int releases) {// 将state的值减1int c = getState() - releases;// 校验当前释放锁的线程是否是持有锁的线程if (Thread.currentThread() != getExclusiveOwnerThread())// 如果不是就抛异常throw new IllegalMonitorStateException();// 判断是否完全释放锁资源boolean free = false;if (c == 0) {free = true;// 如果完全释放锁资源就将持有锁的线程设置为nullsetExclusiveOwnerThread(null);}// 更新state的数目setState(c);// 返回是否完全释放return free;
}private void unparkSuccessor(Node node) {// 获取AQS队列的头节点的状态int ws = node.waitStatus;// 如果状态 == -1,将头节点的状态修改为0(初始状态)if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 获取头节点的下一个节点Node s = node.next;// 如果没获取到下一个节点或者状态是取消 - 就需要要找到一个正常状态的节点if (s == null || s.waitStatus > 0) {s = null;// 从AQS队列的末尾开始,逐个向前找到一个【离头节点最近、在头节点之后、状态正常】的节点// 直到找到这个节点 或者 一个也没找到// 从后向前寻找节点是由于Node节点入队时,新的Node节点会优先指向上一个节点,上一个节点后指向新的节点,避免在这个过程中间丢失对新的Node节点的查找for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 如果找到了需要唤醒的节点if (s != null)// 唤醒下一个节点的线程LockSupport.unpark(s.thread);
}
四、ReentrantLock的ConditionObject
1、介绍
在synchronized中,支持线程在持有锁时的wait、notify的操作,可以实现挂起、唤醒的操作,这个操作会让线程失去对锁的占有。
ReentrantLock基于内部类【ConditionObject】提供了await、signal方法实现了wait、notify的功能。
想执行await和signal方法就需要先持有锁。
2、ConditionObject的应用
package allwe.lock;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class ConditionTest {private static ReentrantLock reentrantLock = new ReentrantLock();public static void main(String[] args) throws InterruptedException {Condition condition = reentrantLock.newCondition();new Thread(() -> {reentrantLock.lock();System.out.println("子线程持有锁");try {Thread.sleep(3000);System.out.println("子线程挂起");condition.await();} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("子线程持有锁");}).start();Thread.sleep(100);reentrantLock.lock();System.out.println("主线程获取锁资源");Thread.sleep(3000);condition.signal();System.out.println("主线程唤醒子线程");Thread.sleep(3000);System.out.println("主线程释放锁");reentrantLock.unlock();}
}
3、Condition的构建方式和核心属性
在执行lock.newCondition()方法时,就是直接new的AQS提供的ConditionObject对象。
在同一个lock中可以保存多个Condition对象,对其中一个Condition对象做操作时不会影响其他的Condition对象。
ConditionObject中有两个核心属性:
private transient Node firstWaiter;
private transient Node lastWaiter;
这两个属性的具体类是AQS中的Node类,虽然Node类具有prev和next两个指针,但是ConditionObject并不使用它们,而是使用nextWaiter指针,来实现单向链表的效果。