加锁流程
公平锁加锁流程:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
加锁流程主要分三大步:
tryAcquire尝试获取锁
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
1、如果state==0,则首先判断一下需不需要排队hasQueuedPredecessors(),因为你获取锁的时候,有可能持有锁的线程刚好释放锁,这时你也要看一下有没有人在排队。判断是否需要排队的逻辑如下:
public final boolean hasQueuedPredecessors() {// The correctness of this depends on head being initialized// before tail and on head.next being accurate if the current// thread is first in queue.Node t = tail; // Read fields in reverse initialization orderNode h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());
}
如果队首==队尾(h == t),则同步队列为空,不需要排队,直接取获取锁;如果第一个排队的人(s = h.next)是自己(s.thread == Thread.currentThread()),则也不需要排队,尝试去获取锁;中间的判断条件:(s = h.next) == null主要是考虑到并发的情况,因为你在尝试加锁的时候,有可能别的线程已经在执行入队操作(调用addWaiter方法),addWaiter方法如下:
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;
}
入队操作构造节点Node信息是分好几步:①node.prev指向队尾②compareAndSetTail把为节点tail指向当前节点③pred.next = node;把头节点指向新加入节点。加入②执行完后,还没有执行到③,则其他线程判断(s = h.next)就有可能为空,因为此时连接还没有建立,但是此时你也是需要排队的
如果hasQueuedPredecessors()判断出不需要排队,则执行compareAndSetState(0, acquires)尝试cas改变state的值,改变成功则获取锁成功;否则执行锁重入逻辑:如果当前持有锁的线程是本线程,则state+1,state是int类型,最大值为2^31-1,如果超过此值,则抛异常,可见重入锁的最大重入次数为2^31-1。
获取锁失败就入队addWaiter
入队代码如下:
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;
}
if条件:pred != null表示当前队列已经被初始化了,则把自己放入队尾;如果队列没有被初始化,则执行enq(node):
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}
死循环表示一定要入队成功
入队完了park
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
如果入队成功的节点的上一个节点是头节点(p == head),这家伙还不死心,再次尝试获取锁(tryAcquire),因为入队也是需要时间的,所以如果我排队第一,有可能前面的人已经释放了锁,我就不需要park了,再次尝试获取锁。如果获取不到,则把自己park掉。park掉之前有一个方法:shouldParkAfterFailedAcquire:
/*** Checks and updates status for a node that failed to acquire.* Returns true if thread should block. This is the main signal* control in all acquire loops. Requires that pred == node.prev.** @param pred node's predecessor holding status* @param node the node* @return {@code true} if thread should block*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}
此方法主要是改前一个节点的waitStatus变量。第一次进来int ws = pred.waitStatus;前一个节点的waitStatus=0,则执行compareAndSetWaitStatus(pred, ws, Node.SIGNAL);改成-1,表示你执行完了要唤醒我!!然后返回false;第二次循环则if (ws == Node.SIGNAL)成立,直接返回true,执行parkAndCheckInterrupt()逻辑,调用内核阻塞当前线程。
park/unpark原理
park函数作用是将当前调用线程阻塞,unpark函数则是将指定线程线程唤醒。
每个线程都会关联一个Parker对象(C代码实现),每个Parker对象都各自维护了三个角色:_counter计数器,_mutex互斥量,_cond条件变量;
park是等待一个许可,unpark 是为某线程提供一个许可。如果某线程A调用park,那么除非另外一个线程调用unpark(A) 给A一个许可,否则线程 A 将阻塞在 park 操作上。每次调用一次 park,需要有一个unpark来解锁,并且unpark可以先于park调用,但是不管unpark先调用几次,都只提供一个许可,不可叠加,只需要一次park来消费掉unpark带来的许可,再次调用会阻塞。
park的执行过程简要如下:
mutex和condition保护了一个叫_counter的信号量。当park时,这个变量被设置为0,当 unpark 时,这个变量被设置为1。(1)调用park;(2)检查_counter,当 _counter=0 时,获取mutex互斥锁,线程阻塞,当 _counter > 0 直接设为 0 并返回;(3)将线程放到_cond阻塞队列中,此时线程处于阻塞状态;(4)虽然此时_counter已经等于0了,但还要再设置一遍,让其等于0
unpark简要流程:
(1)unpark直接设置_counter为1,再 unlock mutex 返回。(2)如果_counter之前的值是0,则还要调用pthread_cond_signal唤醒在park中等待的线程;(3)Thread-0恢复运行;(4)设置_counter等于0
条件唤醒condition
调用reentrantLock.newCondition();会返回一个Condition对象,ReentrantLock维护了若干个条件队列,可以实现条件唤醒。
和synchronized的区别
1、synchronized可以修饰方法、代码块,而ReentrantLock只能修饰代码块;
2、synchronized自动加锁和释放锁,ReentrantLock需要手动加锁和释放锁;
3、synchronized是非公平锁,ReentrantLock默认是非公平锁,可以配置为公平锁;
4、synchronized不能响应中断,ReentrantLock可以;
5、synchronized搭配wait(),notify(),notifyAll()一起使用,锁池是先进后出的栈结构;ReentrantLock维护了一个等待队列和多个条件队列,
6、ReentrantLock支持条件唤醒,synchronized不支持