您的位置:首页 > 健康 > 养生 > Netty组件学习-Future 和 Promise

Netty组件学习-Future 和 Promise

2024/10/6 10:34:21 来源:https://blog.csdn.net/qq_46863837/article/details/139089158  浏览:    关键词:Netty组件学习-Future 和 Promise

Future & Promise

在异步处理时,经常用到这两个接口
首先要说明 netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器

方法API

功能/名称jdk Futurenetty FuturePromise
cancel取消任务--
isCanceled任务是否取消--
isDone任务是否完成,不能区分成功失败--
get获取任务结果,阻塞等待--
getNow-获取任务结果,非阻塞,还未产生结果时返回 null-
await-等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断-
sync-等待任务结束,如果任务失败,抛出异常-
isSuccess-判断任务是否成功-
cause-获取失败信息,非阻塞,如果没有失败,返回null-
addLinstener-添加回调,异步接收结果-
setSuccess--设置成功结果
setFailure--设置失败结果

JDK的Future案例

同步处理任务成功
jdk的future主要的方法就是get拿结果

@Slf4j
public class JDKFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(2);Future<Integer> future = executorService.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {Thread.sleep(1000);log.info("执行完毕");return 10;}});log.info("结果是→{}", future.get());}}

Netty的Future案例

  • 要注意如果使用sync去阻塞,如果运行时抛出异常则会直接在sync处抛出
  • 使用await阻塞,即使抛出了异常也不会影响程序运行,可以通过future.cause()获取异常
  • getNow相比于JDK的get是不会阻塞的,所以可以在await后去根据isSuccess是否执行成功来获取结果或者获取异常
public class NettyFuture {public static void main(String[] args) throws InterruptedException {EventLoopGroup group = new NioEventLoopGroup();EventLoop eventLoop = group.next();Future<Integer> future = eventLoop.submit(new Callable<Integer>() {public Integer call() throws Exception {Thread.sleep(1000);if (true) {throw new RuntimeException("test");}log.info("执行完毕");return 10;}});future.await();if (future.isSuccess()) {log.info("执行成功→{}", future.getNow());} else {log.error("执行失败→{}", future.cause().getMessage());}}}

Netty的Promise

  • 可以通过Promise的setFailure设置结果
  • Promise可以看作是一个放置结果的容器,他的方法也都是围绕其结果的,比如get,getNow,await等,这些阻塞方法或者非阻塞方法都是在promise设置了结果或者设置了异常后完成功能
  • submit返回的Callable更像是,线程主动为我们设置了方法的返回值或者异常,而promise则是我们手动去完成这些操作
DefaultEventLoop eventExecutors = new DefaultEventLoop();DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);eventExecutors.execute(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException e = new RuntimeException("error...");log.debug("set failure, {}", e.toString());promise.setFailure(e);});log.debug("start...");log.debug("{}", promise.getNow());promise.get(); // sync() 也会出现异常,只是 get 会再用 ExecutionException 包一层异常

await 死锁检查案例

DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);eventExecutors.submit(()->{System.out.println("1");try {promise.await();// 注意不能仅捕获 InterruptedException 异常// 否则 死锁检查抛出的 BlockingOperationException 会继续向上传播// 而提交的任务会被包装为 PromiseTask,它的 run 方法中会 catch 所有异常然后设置为 Promise 的失败结果而不会抛出} catch (Exception e) { e.printStackTrace();}System.out.println("2");
});
eventExecutors.submit(()->{System.out.println("3");try {promise.await();} catch (Exception e) {e.printStackTrace();}System.out.println("4");
});

输出

1
2
3
4
io.netty.util.concurrent.BlockingOperationException: DefaultPromise@47499c2a(incomplete)at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:384)at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:212)at com.itcast.oio.DefaultPromiseTest.lambda$main$0(DefaultPromiseTest.java:27)at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)at java.lang.Thread.run(Thread.java:745)
io.netty.util.concurrent.BlockingOperationException: DefaultPromise@47499c2a(incomplete)at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:384)at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:212)at com.itcast.oio.DefaultPromiseTest.lambda$main$1(DefaultPromiseTest.java:36)at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)at java.lang.Thread.run(Thread.java:745)

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com