文章目录
- 背景
- CompletableFuture定义与用法
- Completion
- thenApplyAsync方法
- run()方法
- postComplete()
背景
Java的异步并发Future
接口表示异步计算的结果。CompletableFuture
是对Future
接口的增强,它实现CompletionStage
接口,允许链式组合异步操作,组合多个异步任务的结果,处理异常情况,任务结束时执行回调方法。
CompletableFuture定义与用法
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {volatile Object result; // Either the result or boxed AltResultvolatile Completion stack;
}
CompletableFuture
类有2个成员变量,volatile
修饰符保证多线程环境下变量可见性。result
表示当前任务的结果,可能是正常结果,可能是异常对象。stack
是当前任务确定结果后接下来所有执行的任务栈。
CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
常用的创建方法中,supplyAsync
有返回值,runAsync
没有返回值(返回值类型为Void
)。async
表示异步,即runnable
任务不由调用线程执行,而是由线程池执行。
默认线程池是ForkJoinPool
,这是一个全局唯一的线程池。为了管理任务,建议对不同类型任务分别分配线程池。
Completion
abstract static class Completion extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {volatile Completion next;abstract CompletableFuture<?> tryFire(int mode);abstract boolean isLive();public final void run() { tryFire(ASYNC); }public final boolean exec() { tryFire(ASYNC); return false; }public final Void getRawResult() { return null; }public final void setRawResult(Void v) {}
}
Completion
是CompletableFuture
的内部类。CompletableFuture
强调存储任务结果,而Completion
强调计算单个任务
以及组合多个任务
,强调动作,所以它继承Runnable
,可以异步执行。例子:
ExecutorService executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
CompletableFuture base = new CompletableFuture();
base.thenApply(u -> {return "h";
});
base.thenAcceptAsync(s -> System.out.println(s), executor);
base.thenRunAsync(() -> System.out.println("c"), executor);
// 时间点1
base.complete("s");
程序运行到时间点1,base
任务还没完成(等base.complete("s")
结束base
对象的result
=s
才算完成),它将三个thenXxx()
方法装在栈里,以便将来调用,关系如图。每个thenXxx()
方法都对应一个Completition
对象,后调用的方法在栈顶,先调用的在栈底。栈的底层是链表。stack
表示当前任务完成后将要执行的任务,即栈顶元素。
注意:这里的栈是Treiber stacks
,用CAS原子性更改栈顶元素,实现线程安全地更改栈。
源码中thenApply()
方法对应的不是Completion
类本身,而是它的子类UniApply
类。Completion
的子类很多,如下图,它们的区别是
- 输入不同。比如
Uni
开头的子类表示1个输入,Bi
开头的子类表示2个输入。比如UniApply
接收Function
类型任务,UniRun
接受Runnable
类型任务。 - 功能不同。比如
BiRun
表示两个输入都已运算结束才行,OrRun
表示其中1个结束就行。比如Signaller
类用于当前线程挂起后(比如get()
方法)将来唤醒本线程(就是执行LockSupport.unpark(本线程对象)
)。
图片来自https://tech.meituan.com/2022/05/12/principles-and-practices-of-completablefuture.html。
thenApplyAsync方法
以CompletableFuture#thenApplyAsync()
为例,分析执行流程。如果前置任务已经有结果,那么不加入stack
栈,直接运算,如果前置任务还没算完,那么用unipush()
方法加入栈。
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {// screenExecutor()方法的作用是校验线程池参数return uniApplyStage(screenExecutor(executor), fn);
}
private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T,? extends V> f) {if (f == null) throw new NullPointerException();Object r;if ((r = result) != null) // `result`是前置任务的结果,即本次调用`thenApplyAsync()`方法的`CompletableFuture`对象的`result`变量,即上面代码中base对象的result变量。return uniApplyNow(r, e, f); // `uniApplyNow()`方法的含义是:本线程立即执行,不加入`stack`栈CompletableFuture<V> d = newIncompleteFuture(); // 源码中有非常多`d`,都表示本次计算的结果unipush(new UniApply<T,V>(e, d, this, f));return d;
}
private <V> CompletableFuture<V> uniApplyNow(Object r, Executor e, Function<? super T,? extends V> f) {Throwable x;CompletableFuture<V> d = newIncompleteFuture(); // d表示本次计算任务的返回结果if (r instanceof AltResult) { // 如果前置任务抛出异常if ((x = ((AltResult)r).ex) != null) {d.result = encodeThrowable(x, r); // 当前任务也不执行`Function`方法,直接抛出同样异常return d;}r = null;}try {if (e != null) {e.execute(new UniApply<T,V>(null, d, this, f)); // 交给线程池执行`UniApply`的`run()`方法。第一个参数null是线程池对象,此时已经不需要了。第二个参数`d`是本次任务的结果对象。第三个参数是前置任务的结果对象,即本次调用`thenApplyAsync()`方法的`CompletableFuture`对象。第四个参数是本次任务的方法体。} else {@SuppressWarnings("unchecked") T t = (T) r;d.result = d.encodeValue(f.apply(t)); // 本线程直接执行。t是上次任务的正常结果,d.result是本次任务的正常}} catch (Throwable ex) {d.result = encodeThrowable(ex); // 本次任务如果异常,那么本次任务结果是异常对象}return d;
}
static final class AltResult { // See above // AltResult 是异常包装类final Throwable ex; // null only for NILAltResult(Throwable x) { this.ex = x; }
}
// `UniApply`对象的构造器方法。`src`是前置任务的结果,`dep`(方法中写作`d`)是本次任务的结果,`fn`是本次任务的方法体。
UniApply(Executor executor, CompletableFuture<V> dep,CompletableFuture<T> src,Function<? super T,? extends V> fn) {super(executor, dep, src); this.fn = fn;
}
unipush()
方法,不断自旋将任务压入栈。
final void unipush(Completion c) {if (c != null) {while (!tryPushStack(c)) {// 运行至此,入栈失败if (result != null) { // result != null表示前置任务出结果了NEXT.set(c, null); // 取消`tryPushStack()`方法的`NEXT.set(c, h)`break;}// 运行至此,入栈失败,且前置任务没出结果,则不断自旋}// 运行至此,要么入栈成功,要么前置任务出结果了if (result != null)// 运行至此,前置任务出结果,那么不入栈,直接执行本次方法体,这里执行的不是`run()`方法,而是`trynFire(SYNC)`方法。c.tryFire(SYNC); }
}
final boolean tryPushStack(Completion c) {Completion h = stack; // stack是前置任务的栈顶元素,即this.stackNEXT.set(c, h); // CAS piggyback // 这句语义等于`c.next = h`,而`NEXT`是`VarHandle`对象,能线程安全并且高效地更改对象的值return STACK.compareAndSet(this, h, c);// 如果返回true,表示当前`CompletableFuture`对象的`stack`由h,线程安全地改成了c.// stack = h, 本次CAS自旋改成了 stack = c, c.next = h。
}
run()方法
abstract static class Completion extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {abstract CompletableFuture<?> tryFire(int mode);public final void run() { tryFire(ASYNC); }
}
class CompletableFuture {static final int SYNC = 0;static final int ASYNC = 1;static final int NESTED = -1;
}
Completition
类定义run()
方法,内部执行的是子类覆写的tryFire(int mode)
方法。try
表示尝试执行,可能执行失败。int
型mode
变量其实是个枚举量,只有3个值。
SYNC
,同步,如果没有得到结果,本方法调用不返回。ASYNC
,异步,就算没有得到结果,本方法调用也返回。NESTED
,内嵌,postComplete()
方法独有的。
mode
不同,tryFire(int mode)
方法执行流程也不同。以UniApply
类的tryFire(int mode)
为例。
static final class UniApply<T,V> extends UniCompletion<T,V> {Function<? super T,? extends V> fn; // 成员变量`fn`,本次任务的方法体// 构造器方法UniApply(Executor executor, CompletableFuture<V> dep,CompletableFuture<T> src,Function<? super T,? extends V> fn) {super(executor, dep, src); this.fn = fn;}final CompletableFuture<V> tryFire(int mode) {CompletableFuture<V> d; CompletableFuture<T> a; Object r; Throwable x; Function<? super T,? extends V> f;if ((a = src) == null || (r = a.result) == null|| (d = dep) == null || (f = fn) == null)// 运行至此,(a = src) == null为true,(d = dep) == null为true,(f = fn) == null)为true都表示已执行完`src = null; dep = null; fn = null;`语句,当前`UniApply`已被清空,任务已被完成// (r = a.result) == null为true表示前置任务还没完成,还不能执行当前操作return null; tryComplete: if (d.result == null) { // 当前任务结果为空if (r instanceof AltResult) { // 前置任务结果异常,则当前任务也异常if ((x = ((AltResult)r).ex) != null) {d.completeThrowable(x, r);break tryComplete;// 跳出嵌套,下一行执行`src = null; dep = null; fn = null;`}r = null; // 及时清空变量,帮助GC}try {// mode <= 0 为true表示状态不是ASYNC,即非异步执行// !claim()为true表示该任务已经被执行过了// 因此返回if (mode <= 0 && !claim())return null;else {@SuppressWarnings("unchecked") T t = (T) r;// 本线程直接方法体,t是前置任务的结果d.completeValue(f.apply(t));}} catch (Throwable ex) {d.completeThrowable(ex);}}src = null; dep = null; fn = null;// 执行至此,本次任务结束,将成员变量`src, dep, fn`都设为null,表示当前任务已完成。// 比如别的线程调用此对象的`isLive()`方法(),就返回dep对象是否为空。// final boolean isLive() { // return dep != null; // }return d.postFire(a, mode);}
}
// UniCompletion<T,V>#claim(),其中UniCompletion是UniApply的父类
final boolean claim() {Executor e = executor;if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {// CAS原子性修改标志位,表示只有1个线程能够执行本`Completion`类// 运行至此,表示本`Completion`类第一次被执行if (e == null)return true; // 本线程直接执行,返回trueexecutor = null; // disable 成员变量executore.execute(this); // 交给线程池执行,跳出if语句,返回的是false}return false;
}
// CompletableFuture#postFire()
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {if (a != null && a.stack != null) {Object r;if ((r = a.result) == null) // if语句为true表示当前对象未完成a.cleanStack(); // 清理a对象`stack`栈中空对象,即已经完成的对象if (mode >= 0 && (r != null || a.result != null)) // if语句为true表示当前a任务已完成,状态为`SYNC, ASNYC`而不是`NESTED`// a任务已经完成,现在要处理a对象的下游(依赖)对象a.postComplete();}if (result != null && stack != null) {// mode < 0 为true表示状态是`NESTED`,返回当前任务对象,`postComplete()`方法会用到if (mode < 0)return this;elsepostComplete(); // 当前任务已经完成,现在要处理当前任务的下游(依赖)对象}return null;
}
postComplete()
进入postComplete()
方法的前提是当前任务已经有结果了,即this.result != null
。
final void postComplete() {/** On each step, variable f holds current dependents to pop* and run. It is extended along only one path at a time,* pushing others to avoid unbounded recursion.*/CompletableFuture<?> f = this; Completion h; // f是当前任务,h是栈顶元素while ((h = f.stack) != null ||(f != this && (h = (f = this).stack) != null)) { CompletableFuture<?> d; Completion t;if (STACK.compareAndSet(f, h, t = h.next)) {if (t != null) {if (f != this) {pushStack(h);continue;}NEXT.compareAndSet(h, t, null); // try to detach}f = (d = h.tryFire(NESTED)) == null ? this : d;}}
}
以图为例,执行base.postComplete()
方法。
第一次进入while
循环,h = thenComp1, f = base
,执行f = (d = h.tryFire(NESTED)) == null ? this : d;
后f=comp1Result
(原因是tryFire(NESTED)
方法不会链式执行,只执行1个任务就返回结果)。
第二次进入while
循环,f=comp1Result, h = comp1Thencomp1
,由于f != this
,执行pushStack(h)
后h
离开comp1Result
,进入base
的stack
。
continue
表示会第三次进入while
循环,将comp1Thencomp2
也加入到base
的stack
。可以看到,comp1Result
的栈元素是倒序之后进入base
的栈。
第四次循环h != null
为false
,h
重新赋值为当前base
的stack
。开始执行comp1Thencomp2
任务。