文章目录
- 类图结构及概要
- 核心方法
- acquire() 方法
- acquire(int permits) 方法
- acquireUninterruptibly() 方法
- acquireUninterruptibly(int permits)方法
- release() 方法
- release(int permits) 方法
- 总结
类图结构及概要
Semaphore 信号量也是 Java 中的 一 个同步器,与 CountDownLatch 和 CycleBarrier 不同的是,它内部的计数器是递增的,并且在一开始初始化 Semaphore 时可以指定一个初始值,但是并不需要知道需要同步的线程个数,而是在需要同步的地方调用 acquire 方法时指定需要同步的线程个数。
由该类图可知, Semaphore 还是使用 AQS 实现的。 Sync 只是对 AQS 的一个修饰,并且 Sync 有两个实现类,用来指定获取信号量时是否采用公平策略。
``
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Sync(int permits) {
setState(permits);
}
``
在如上代码 中 , Semaphore 默认采用非公平策略,如果需要使用公平策略则可以使用带两个参数的构造函数来构造 Semaphore 对象。另外 ,如 CountDownLatch 构造函数传递的初始化信号量个数 permits 被赋给了 AQS 的 state 状态变量一样,这里 AQS 的 state 值也表示当前持有的信号量个数 。
核心方法
acquire() 方法
当前线程调用 该方法的目的是希望获取一个信号量资源。 如果 当前信号量个数大于 0,则 当前信号量的计数会减 1 , 然后该方法直接返回。否则如果当前信号量个数等于 0,则当前线程会被放入 AQS 的阻塞队列。当其他线程调用了当前线程 的 interrupt ()方法 中断了当前线程时 ,则当前线程会抛出 InterruptedException 异常返回。下面看下代码实现:
``
public void acquire() throws InterruptedException {
// 传递参数为 1 ,说明要获取1个信号量资源
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// (1) 如果线程被中断,则抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
// (2) 否则调用 Sync子类方法尝试获取, 这里根据构造函数确定使用公平策略
if (tryAcquireShared(arg) < 0)
// 如果获取失败则放入阻塞队列 。 然后再次 尝试,如果失败则调用 park方法挂起当前线程
doAcquireSharedInterruptibly(arg);
}
``
由如上代码可知, acquire() 在内部调用了 Sync 的 acquireSharedlnterruptibly 方法,后者会对中断进行响应(如果当前线程被中断, 则抛出中断异常) 。尝试获取信号量资源的AQS 的方法 tryAcquireShared 是 由 Sync 的子 类实 现的,所以这 里分 别从两 方 面来讨论 。先讨论非公平策略 NonfairSync 类的tryAcquireShared 方法,代码如下 。
``
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;😉 {
// 获取当前信号量值
int available = getState();
// 计算 当前剩余值
int remaining = available - acquires;
// 如果当前剩余位小于 0或者CAS设置成功则返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
``
如上代码先获取 当 前信号量值 ( available ),然后减去需要获取的值( acquires ) , 得到剩余的信号量个数( remaining ),如果剩余值小于 0 则说明当前信号量个数满足不了需求,那么直接返回负数 , 这时当前线程会被放入 AQS 的阻塞队列而被挂起 。 如果剩余值大于 o,则使用 CAS 操作设置当前信号量值为剩余值,然后返回剩余值。
另外,由于 NonFairSync 是非公平获取的,也就是说先调用 aquire 方法获取信号量 的线程不一定比后来者先获取到信号量。 考虑下面场景,如果线程 A 先调用了 aquire ()方法获取信号量,但是当前信 号量个数为 0 , 那么线程 A 会被放入 AQS 的阻塞队列 。 过一段时间后线程 C 调用了 release ( )方法释放了 一个信号量,如果当前没有其他线程获取信号量 , 那么线程 A 就会被激活,然后获取该信号量 , 但是假如线程 C 释放信号量后,线程 C 调用了 aquire 方法,那么线程 C 就会和线程 A 去竞争这个信号量资源 。 如果采用非公平策略,由 nonfairTryAcquireShared 的代码可知,线程 C 完全可以在线程 A 被激活前,或者激活后先于线程 A 获取到该信号量,也就是在这种模式下阻塞线程和当前请求的线程是竞争关系,而不遵循先来先得的策略。下面看公平性的 FairSync 类是如何保证公平性的 。
``
protected int tryAcquireShared(int acquires) {
for (;😉 {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
``
可见公平性还是靠 hasQueuedPredecessors 这个函数来保证的 。 公平策略是看当前线程节点的前驱节点是否也在等待获取该资源,如果是则 自己放弃获取的权限,然后当前线程会被放入 AQS 阻塞队列,否则就去获取。
acquire(int permits) 方法
该方法与 acquire()方法不同,后者只需要获取一个信号量值, 而前者则获取 permits 个 。
``
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
``
acquireUninterruptibly() 方法
该方法与 acquire() 类 似,不同之处在于该方法 对中断不响应,也就是当当前线程调用了 acquireUninterrupti bly 获取 资 源时(包含被阻 塞后 ),其他线程调用了当前线程的 interrupt ( )方法设置了当前线程的中断标志,此时 当 前线程并不会抛出InterruptedException 异常而返回。
``
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
``
acquireUninterruptibly(int permits)方法
该方法与 acquire(int permits) 方法的不同之处在于,该方法对中断不响应。
``
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
``
release() 方法
该方法的作用 是把当前 Semaphore 对象的信号量值增加 1,如果当前有线程因为调用aquire 方法被阻塞 而被放入了 AQS 的阻塞 队列,则会根据公平策略选择 一个信号 量 个数能被满足的线程进行激活 , 激活的线程会尝试获取刚增加的信号量,下面看代码实现
``
public void release() {
// (1) arg= 1
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// (2) 尝试择放资源
if (tryReleaseShared(arg)) {
// (3) 资源释放成功则调用 park方法唤醒AQS 队列里面最先挂起的线程
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;😉 {
// (4) 获取 当 前信号量值
int current = getState();
// (5) 将当前信号量值增加 releases ,这里为增加 1
int next = current + releases;
if (next < current) // overflow
throw new Error(“Maximum permit count exceeded”);
// (6) 使用 CAS保证更新信号量值的原子性
if (compareAndSetState(current, next))
return true;
}
}
``
由代码 release() ->sync.releaseShared( 1 ) 可 知, release 方法每次只会对信号量值增加1,tryReleaseShared 方法是无限循环,使用 CA S 保证了 release 方法对信号量递增 1 的原子性操作。 tryReleaseShared 方法增加信号量值成功后会执行代码 (3),即调用 AQS 的方法来激活因 为调用 aquire 方法而被阻塞的线程 。
release(int permits) 方法
该方法与不带参数的 release 方法的不 同之处在于,前者每次调用会在信号量值原来的基础上增加 permits ,而后者每次增加 1 。
``
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
``
另外可 以看到,这里的 sync .releaseShared 是共享方法,这说明该信号量是线程共享 的,信号量没有和固定线程绑定 , 多个线程可 以 同时使用 CAS 去更新信号量的值而不会被阻塞。
总结
Semaphore 完全可以达到CountDownLatch 的效果,但是 Semaphore 的计数器是不可以自动重置的 , 不过通过变相地改变 aquire 方法的参数还是可 以实现 CycleBanier 的功能的 。Semaphore 也是使用 AQS 实现的 , 并且获取信号量时有公平策略和非公平策略之分。