Mono 和Flux是响应式编程库 Reactor 的核心组件,主要用于处理异步流和响应式数据流。它们的设计基于 Reactive Streams 规范,提供了非阻塞的方式处理数据。
Mono
Mono 表示0或1个元素的数据流。它适用于返回单个值或没有值的场景,例如:
- 数据库查询的单行结果。
- HTTP 请求的单次响应。
- 异步计算的结果
特点
- 单元素流:Mono 只能发射一次数据,之后可以发射完成信号或错误信号。
- 懒加载:只有在有订阅者时才开始计算或执行流操作。
创建 Mono
import reactor.core.publisher.Mono;public class MonoExample {public static void main(String[] args) {// 创建一个包含数据的 MonoMono<String> mono = Mono.just("Hello, Mono!");// 空的 MonoMono<Object> emptyMono = Mono.empty();// 错误的 MonoMono<Object> errorMono = Mono.error(new RuntimeException("Error occurred"));// 订阅以触发流mono.subscribe(System.out::println); // 输出: Hello, Mono!}
}
常用操作符
- map:对数据进行变换
- flatMap:将数据转换为另一个异步流
- doOnNext:在数据发射时执行额外操作
- onErrorResume:处理错误并提供备用流
Mono.just("Reactive Programming").map(String::toUpperCase).doOnNext(System.out::println).onErrorResume(e -> Mono.just("Fallback")).subscribe();
Flux
Flux 表示 0到N个元素 的数据流。它适用于需要处理多个元素的场景,例如:
- 数据库查询的多行结果
- 消息队列中的消息流
- 实时事件流
特点
- 多元素流:Flux 可以发射多个元素,之后发射完成信号或错误信号
- 懒加载:与 Mono 一样,只有在订阅时才开始执行流操作
创建 Mono
import reactor.core.publisher.Flux;public class FluxExample {public static void main(String[] args) {// 创建一个包含多个元素的 FluxFlux<String> flux = Flux.just("A", "B", "C");// 从数组创建 FluxFlux<Integer> fluxFromArray = Flux.fromArray(new Integer[]{1, 2, 3});// 生成范围的 FluxFlux<Integer> fluxRange = Flux.range(1, 5);// 订阅以触发流flux.subscribe(System.out::println); // 输出: A B C}
}
常用操作符
- filter:过滤数据
- map:对数据进行变换。
- flatMap:将每个元素映射为新的流
- buffer:将元素收集到批次中
- onErrorContinue:忽略错误并继续处理剩余数据
Flux.range(1, 10).filter(i -> i % 2 == 0).map(i -> "Number: " + i).subscribe(System.out::println);
Mono 与 Flux 的关系
- Mono 是 Flux 的特例:Mono 仅处理 0 或 1 个元素,而 Flux 可以处理任意数量的元素。
- 许多操作符(例如 map, filter, flatMap)在 Mono 和 Flux 中是通用的
使用场景比较
特性 | Mono | Flux |
---|---|---|
数据量 | 0或1个元素 | 0到N 个元素 |
使用场景 | 单个对象、HTTP 响应、单次计算等 | 数据列表、流式数据、事件流等 |