CompletableFuture异步编程
- CompletableFuture介绍
- 与传统 Future 的对比
- 使用方法
- 1. 使用 supplyAsync(有返回值)
- 使用 runAsync(无返回值)
- 指定自定义线程池
- 处理异步结果
- 1. thenApply:转换结果
- 2.thenAccept:消费结果
- 3.thenRun:完成后执行操作
- 组合任务
- 1. thenCompose:串联两个任务
- 2. thenCombine:合并两个任务结果
- 3. allOf:等待所有任务完成
- 4. anyOf:任意一个任务完成
- 异常处理
- 1. exceptionally:捕获异常并返回默认值
- 2. handle:无论成功/失败都处理
- 3. whenComplete:记录日志但不修改结果
- 完整示例:链式调用 + 异常处理
- 关键点总结
CompletableFuture介绍
1.基础概念
CompletableFuture 是 Java 8 引入的一个类,用于表示异步计算的结果。它实现了 Future 接口,但比传统的 Future 更强大,支持:
-
非阻塞操作:通过回调函数处理结果,无需手动调用 get() 阻塞线程。
-
链式编程:将多个异步任务串联或并联,形成复杂的执行流水线。
-
异常处理:提供统一的异常捕获和恢复机制。
2. 核心思想
-
异步编程:将耗时的操作(如I/O、网络请求)交给其他线程执行,主线程继续处理其他任务。
-
函数式风格:通过 thenApply、thenAccept 等方法,以声明式的方式组合任务。
3. 关键特点
-
回调驱动:任务完成后自动触发后续操作。
-
线程池集成:支持自定义线程池,避免资源竞争。
-
结果依赖管理:轻松处理多个任务之间的依赖关系(如A任务的结果是B任务的输入)。
与传统 Future 的对比
特性 | Future | CompletableFuture |
---|---|---|
结果获取 | 阻塞调用 get() | 非阻塞回调(thenAccept) |
任务组合 | 需要手动轮询 | 链式调用(thenApply、thenCompose) |
异常处理 | 需在调用代码中处理 | 内置 exceptionally、handle |
线程控制 | 依赖 ExecutorService | 支持自定义线程池 |
适用场景 | 简单的异步任务 | 复杂的异步流水线 |
使用方法
1. 使用 supplyAsync(有返回值)
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);}catch (InterruptedException e){e.printStackTrace();}return "00";});// 获取结果(阻塞)String result = future.get();System.out.println("result:"+result);}
}
使用 runAsync(无返回值)
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("sleep 1m");});//等待任务完成completableFuture.get();}
}
指定自定义线程池
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executor = Executors.newFixedThreadPool(2);CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {return "Custom Thread Pool";}, executor);String s = future.get();System.out.println(s);}
}
处理异步结果
1. thenApply:转换结果
thenApply 方法用于在 CompletableFuture 完成时应用一个函数,并返回计算的结果。它返回一个新的 CompletableFuture,该 CompletableFuture 的类型由函数返回值的类型决定。
语法:
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
示例:
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello").thenApply(s -> s + " World");String s = future.get();System.out.println(s);}
输出
2.thenAccept:消费结果
thenAccept 方法用于在 CompletableFuture 完成时执行一个消费者(Consumer)操作,但不返回任何值(即它的返回类型是 void)。这通常用于执行一些副作用,比如打印日志、更新UI等,而不关心计算的结果。
语法:
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
示例:
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello").thenAccept(s -> System.out.println("Result: " + s));future.get();}
}
输出
3.thenRun:完成后执行操作
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello").thenRun(() -> System.out.println("Task finished"));future.get();}
}
输出
组合任务
1. thenCompose:串联两个任务
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() ->{String a = "Hello";System.out.println(Thread.currentThread().getName() + "-a:" + a);return a;}).thenCompose(s -> CompletableFuture.supplyAsync(() -> {String r = " World";System.out.println(Thread.currentThread().getName() + "-r:" + r);return s + r;}));String s = future.get();// "Hello World"System.out.println(Thread.currentThread().getName() + "s:"+s);}
}
输出:
2. thenCombine:合并两个任务结果
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {String h = "Hello";System.out.println(Thread.currentThread().getName() + " h:" + h);return h;});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() ->{String w = " World";System.out.println(Thread.currentThread().getName() + " w:" + w);return w;});CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);String s = combined.get();System.out.println(Thread.currentThread().getName() + " s:" + s);}
}
输出:
3. allOf:等待所有任务完成
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "Task1");CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "Task2");CompletableFuture<Void> all = CompletableFuture.allOf(task1, task2);all.thenRun(() -> System.out.println("All tasks completed"));}
}
输出
4. anyOf:任意一个任务完成
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);}catch (InterruptedException e){e.printStackTrace();}return "Task1";});CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "Task2");CompletableFuture<Object> any = CompletableFuture.anyOf(task1, task2);any.thenAccept(result -> System.out.println("First result: " + result)); // 输出 "Task2"}
}
输出
异常处理
1. exceptionally:捕获异常并返回默认值
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {if (true) throw new RuntimeException("Error!");return "Success";}).exceptionally(ex -> "Fallback Value");String s = future.get();// 返回 "Fallback Value"System.out.println(s);}
}
2. handle:无论成功/失败都处理
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {if (new Random().nextBoolean()) throw new RuntimeException("Error!");return "Success";}).handle((result, ex) -> {if (ex != null) return "Fallback";return result;});}
3. whenComplete:记录日志但不修改结果
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello").whenComplete((result, ex) -> {if (ex != null) ex.printStackTrace();else System.out.println("Result: " + result);});String s = future.get();System.out.println(s);}
}
完整示例:链式调用 + 异常处理
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture.supplyAsync(() -> {// 步骤1:获取用户IDreturn 123;}).thenApply(userId -> {// 步骤2:根据用户ID查询名称if (userId == 123) return "Alice";else throw new IllegalArgumentException("Invalid User ID");}).thenApply(userName -> {// 步骤3:转换为大写return userName.toUpperCase();}).exceptionally(ex -> {// 统一异常处理System.out.println("Error: " + ex.getMessage());return "DEFAULT_USER";}).thenAccept(finalResult -> {System.out.println("Final Result: " + finalResult); // 输出 "ALICE" 或 "DEFAULT_USER"});}
}
输出
关键点总结
异步执行:使用 supplyAsync/runAsync 启动异步任务。
链式调用:通过 thenApply/thenAccept/thenRun 串联操作。
组合任务:thenCompose(依赖)和 thenCombine(并行)合并结果。
异常处理:优先使用 exceptionally 或 handle 提供容错。
线程池控制:避免使用默认线程池处理阻塞任务(如I/O)