您的位置:首页 > 教育 > 锐评 > 旅游必去的10个地方_怎么做境外电商平台_推销网站_软文营销文章范文

旅游必去的10个地方_怎么做境外电商平台_推销网站_软文营销文章范文

2025/4/5 10:06:18 来源:https://blog.csdn.net/sixpp/article/details/146905980  浏览:    关键词:旅游必去的10个地方_怎么做境外电商平台_推销网站_软文营销文章范文
旅游必去的10个地方_怎么做境外电商平台_推销网站_软文营销文章范文

在这里插入图片描述

文章目录

    • 3.3 Project Reactor(Spring Reactor)
      • 3.3.1 Mono(0-1个数据流)
        • 核心特性
        • 创建Mono的方式
        • 常用操作符
        • 订阅Mono
        • 实际应用场景
      • 3.3.2 Flux(0-N个数据流)
        • 核心特性
        • 创建Flux的方式
        • 常用操作符
        • 订阅Flux
        • 实际应用场景
      • 3.3.3 Schedulers(调度器)
        • 核心概念
        • 预定义的Scheduler类型
        • 使用Scheduler
        • 调度策略选择指南
        • 最佳实践
        • 实际应用示例
      • 总结

在这里插入图片描述

3.3 Project Reactor(Spring Reactor)

在这里插入图片描述

Project Reactor是Spring生态系统中的响应式编程库,它为构建非阻塞、异步和事件驱动的应用程序提供了强大的工具集。作为Spring WebFlux的默认响应式库,Reactor实现了Reactive Streams规范,使开发者能够以声明式的方式处理异步数据流。

3.3.1 Mono(0-1个数据流)

Mono是Project Reactor中表示最多包含一个元素的异步序列的核心类型。它代表了一种可能在未来某个时间点产生单个值(或空值)的异步计算。

核心特性
  1. 单值或空序列:Mono要么发出一个元素,要么不发出任何元素(仅发出完成信号)
  2. 延迟执行:Mono的操作通常是惰性的,只有在订阅时才会执行
  3. 异步处理:Mono支持非阻塞的异步处理模式
  4. 错误处理:提供了丰富的错误处理机制
创建Mono的方式

在这里插入图片描述

// 1. 从值创建
Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.justOrEmpty(null); // 允许空值// 2. 从Supplier创建
Mono<String> mono3 = Mono.fromSupplier(() -> "Hello from Supplier");// 3. 从Callable创建
Mono<String> mono4 = Mono.fromCallable(() -> "Hello from Callable");// 4. 从Future创建
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello from Future");
Mono<String> mono5 = Mono.fromFuture(future);// 5. 空Mono
Mono<Void> mono6 = Mono.empty();// 6. 错误Mono
Mono<String> mono7 = Mono.error(new RuntimeException("Something went wrong"));
常用操作符

在这里插入图片描述

  1. 转换操作符

    • map:同步转换值
    • flatMap:异步转换值(返回另一个Mono)
    • flatMapMany:将Mono转换为Flux
    Mono<String> original = Mono.just("Hello");
    Mono<Integer> mapped = original.map(String::length);
    Mono<String> flatMapped = original.flatMap(s -> Mono.just(s + " World"));
    Flux<String> flatMapMany = original.flatMapMany(s -> Flux.just(s.split("")));
    
  2. 过滤操作符

    • filter:基于条件过滤
    • defaultIfEmpty:如果为空提供默认值
    Mono<String> filtered = Mono.just("Hello").filter(s -> s.length() > 3);Mono<String> withDefault = Mono.<String>empty().defaultIfEmpty("Default Value");
    
  3. 错误处理操作符

    • onErrorReturn:出错时返回默认值
    • onErrorResume:出错时切换到备用的Mono
    • onErrorMap:转换错误类型
    • retry:重试操作
    Mono<String> withErrorHandling = Mono.error(new RuntimeException()).onErrorReturn("Fallback Value");Mono<String> withResume = Mono.error(new RuntimeException()).onErrorResume(e -> Mono.just("Recovered from " + e.getMessage()));
    
  4. 组合操作符

    • zipWith:与另一个Mono组合
    • then:忽略当前结果,执行另一个操作
    Mono<String> first = Mono.just("Hello");
    Mono<String> second = Mono.just("World");
    Mono<String> zipped = first.zipWith(second, (f, s) -> f + " " + s);Mono<Void> thenOperation = Mono.just("Hello").then(Mono.empty());
    
  5. 时间相关操作符

    • delayElement:延迟发出元素
    • timeout:设置超时时间
    Mono<String> delayed = Mono.just("Hello").delayElement(Duration.ofSeconds(1));Mono<String> withTimeout = Mono.just("Hello").delayElement(Duration.ofSeconds(2)).timeout(Duration.ofSeconds(1));
    
订阅Mono

在这里插入图片描述

Mono是惰性的,只有订阅时才会执行:

// 1. 简单订阅
mono.subscribe();// 2. 带消费者的订阅
mono.subscribe(value -> System.out.println("Received: " + value), // onNexterror -> System.err.println("Error: " + error),   // onError() -> System.out.println("Completed")             // onComplete
);// 3. 带Subscription控制的订阅
mono.subscribe(new BaseSubscriber<String>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {System.out.println("Subscribed");request(1); // 请求第一个元素}@Overrideprotected void hookOnNext(String value) {System.out.println("Received: " + value);request(1); // 请求下一个元素(对于Mono通常不需要)}
});
实际应用场景

在这里插入图片描述

  1. HTTP请求响应:在WebFlux中,控制器方法可以返回Mono来表示异步的单个响应
  2. 数据库操作:Spring Data Reactive Repositories返回Mono用于单个实体的CRUD操作
  3. 缓存查询:从缓存中获取单个值
  4. 异步任务:执行返回单个结果的异步计算
@RestController
public class UserController {private final UserRepository userRepository;public UserController(UserRepository userRepository) {this.userRepository = userRepository;}@GetMapping("/users/{id}")public Mono<User> getUser(@PathVariable String id) {return userRepository.findById(id).switchIfEmpty(Mono.error(new UserNotFoundException(id)));}
}

3.3.2 Flux(0-N个数据流)

在这里插入图片描述

Flux是Project Reactor中表示0到N个元素的异步序列的核心类型。它代表了一个可能在未来某个时间点产生多个值的异步数据流。

核心特性
  1. 多值序列:Flux可以发出0到N个元素,然后可选地以一个完成信号或错误信号结束
  2. 背压支持:允许消费者控制生产者的速度,防止内存溢出
  3. 延迟执行:与Mono类似,Flux的操作也是惰性的
  4. 丰富的操作符:提供了大量操作符来处理、转换和组合数据流
创建Flux的方式
// 1. 从多个值创建
Flux<String> flux1 = Flux.just("A", "B", "C");// 2. 从数组或集合创建
Flux<String> flux2 = Flux.fromArray(new String[]{"A", "B", "C"});
Flux<String> flux3 = Flux.fromIterable(Arrays.asList("A", "B", "C"));// 3. 从范围创建
Flux<Integer> flux4 = Flux.range(1, 5); // 1, 2, 3, 4, 5// 4. 从流生成器创建
Flux<Long> flux5 = Flux.generate(() -> 0L, // 初始状态(state, sink) -> {sink.next(state);if (state == 10) sink.complete();return state + 1;}
);// 5. 从间隔创建(周期性发出值)
Flux<Long> flux6 = Flux.interval(Duration.ofMillis(100)); // 0, 1, 2... 每100ms// 6. 空Flux
Flux<String> flux7 = Flux.empty();// 7. 错误Flux
Flux<String> flux8 = Flux.error(new RuntimeException("Flux error"));
常用操作符

在这里插入图片描述

  1. 转换操作符

    • map:同步转换每个元素
    • flatMap:异步转换每个元素(返回Flux)
    • concatMap:保持顺序的flatMap
    • flatMapSequential:合并结果但保持源顺序
    Flux<String> original = Flux.just("apple", "banana", "cherry");
    Flux<Integer> mapped = original.map(String::length);
    Flux<String> flatMapped = original.flatMap(s -> Flux.fromArray(s.split("")));
    
  2. 过滤操作符

    • filter:基于条件过滤元素
    • take:取前N个元素
    • skip:跳过前N个元素
    • distinct:去重
    Flux<Integer> numbers = Flux.range(1, 10);
    Flux<Integer> evens = numbers.filter(n -> n % 2 == 0);
    Flux<Integer> first5 = numbers.take(5);
    
  3. 组合操作符

    • mergeWith:合并多个Flux,不保证顺序
    • concatWith:顺序连接Flux
    • zipWith:与另一个Flux组合
    • combineLatest:当任一Flux发出值时组合最新值
    Flux<String> fluxA = Flux.just("A", "B", "C");
    Flux<String> fluxB = Flux.just("1", "2", "3");Flux<String> merged = fluxA.mergeWith(fluxB); // A, 1, B, 2, C, 3
    Flux<String> concatenated = fluxA.concatWith(fluxB); // A, B, C, 1, 2, 3
    Flux<String> zipped = fluxA.zipWith(fluxB, (a, b) -> a + b); // A1, B2, C3
    
  4. 错误处理操作符

    • onErrorReturn:出错时返回默认值
    • onErrorResume:出错时切换到备用的Flux
    • onErrorContinue:跳过错误元素并继续处理
    • retry:重试操作
    Flux<Integer> withErrorHandling = Flux.just(1, 2, 0, 4).map(i -> 10 / i).onErrorResume(e -> Flux.just(-1));Flux<Integer> withContinue = Flux.just(1, 2, 0, 4).map(i -> {try {return 10 / i;} catch (Exception e) {throw Exceptions.propagate(e);}}).onErrorContinue((e, o) -> System.out.println("Error for " + o + ": " + e.getMessage()));
    
  5. 背压操作符

    • onBackpressureBuffer:缓冲元素
    • onBackpressureDrop:丢弃无法处理的元素
    • onBackpressureLatest:只保留最新元素
    Flux.range(1, 1000).onBackpressureBuffer(50) // 缓冲区大小为50.subscribe(new BaseSubscriber<Integer>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {request(10); // 初始请求10个元素}@Overrideprotected void hookOnNext(Integer value) {System.out.println("Received: " + value);// 根据需要请求更多元素if (value % 10 == 0) {request(10);}}});
    
  6. 时间相关操作符

    • delayElements:延迟发出每个元素
    • timeout:设置超时时间
    • sample:定期采样
    Flux.range(1, 5).delayElements(Duration.ofMillis(500)).subscribe(System.out::println);
    
订阅Flux

在这里插入图片描述

与Mono类似,Flux也是惰性的,需要订阅才能执行:

// 1. 简单订阅
flux.subscribe();// 2. 带消费者的订阅
flux.subscribe(value -> System.out.println("Received: " + value), // onNexterror -> System.err.println("Error: " + error),   // onError() -> System.out.println("Completed"),           // onCompletesubscription -> subscription.request(3)          // 初始请求3个元素
);// 3. 带Subscription控制的订阅
flux.subscribe(new BaseSubscriber<String>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {System.out.println("Subscribed");request(1); // 请求第一个元素}@Overrideprotected void hookOnNext(String value) {System.out.println("Received: " + value);// 处理完当前元素后请求下一个request(1);}
});
实际应用场景

在这里插入图片描述

  1. 流式API:提供持续更新的数据流(如股票价格、传感器数据)
  2. 批量数据处理:处理大量数据时避免内存溢出
  3. 事件处理:处理来自消息队列或事件总线的事件流
  4. 文件处理:逐行读取大文件
@RestController
public class EventController {private final EventPublisher eventPublisher;public EventController(EventPublisher eventPublisher) {this.eventPublisher = eventPublisher;}@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<Event> getEvents() {return eventPublisher.publish().map(event -> {// 转换或丰富事件数据return event;}).onBackpressureBuffer(100).log();}
}

3.3.3 Schedulers(调度器)

在响应式编程中,Schedulers负责管理线程和执行上下文。Project Reactor提供了多种预定义的Scheduler实现,用于控制异步操作的执行位置。

核心概念

在这里插入图片描述

  1. 线程模型:响应式编程通常使用少量线程处理大量并发
  2. 非阻塞I/O:避免线程阻塞,提高资源利用率
  3. 执行上下文:决定操作在哪个线程或线程池上执行
预定义的Scheduler类型
  1. Schedulers.immediate()

    • 在当前线程立即执行
    • 通常用于测试或不需要异步的场景
  2. Schedulers.single()

    • 使用单个可重用的线程
    • 适用于低延迟的轻量级任务
    • 所有调用者共享同一个线程
  3. Schedulers.elastic()(已弃用,推荐使用boundedElastic):

    • 无限扩展的线程池
    • 适合阻塞I/O操作
    • 每个新任务可能创建新线程
  4. Schedulers.boundedElastic()

    • 有界的弹性线程池
    • 默认最多创建10 * CPU核心数的线程
    • 适合阻塞I/O操作
    • 比elastic更可控,避免资源耗尽
  5. Schedulers.parallel()

    • 固定大小的并行线程池
    • 默认大小等于CPU核心数
    • 适合计算密集型任务
  6. Schedulers.fromExecutorService()

    • 从现有的ExecutorService创建
    • 允许与现有线程池集成
使用Scheduler
  1. 发布到Scheduler

    • publishOn:影响后续操作符的执行上下文
    • subscribeOn:影响整个链的订阅上下文(通常用在链的开头)
    Flux.range(1, 5).map(i -> {System.out.println("Map on " + Thread.currentThread().getName());return i * 2;}).publishOn(Schedulers.boundedElastic()).filter(i -> {System.out.println("Filter on " + Thread.currentThread().getName());return i % 3 == 0;}).subscribeOn(Schedulers.parallel()).subscribe();
    
  2. 指定操作符的Scheduler
    许多操作符接受可选的Scheduler参数

    Flux.interval(Duration.ofMillis(100), Schedulers.single()).subscribe(System.out::println);
    
调度策略选择指南
  1. 计算密集型操作

    • 使用parallel()调度器
    • 避免线程切换开销
    • 线程数通常等于CPU核心数
  2. 阻塞I/O操作

    • 使用boundedElastic()调度器
    • 防止阻塞事件循环线程
    • 允许更多的线程处理并发I/O
  3. 非阻塞异步操作

    • 通常不需要显式调度
    • 由底层异步库管理线程
  4. UI交互

    • 使用专用的UI线程调度器
    • 通过Schedulers.fromExecutor()与UI框架集成
最佳实践

在这里插入图片描述

  1. 避免在响应式链中阻塞

    • 如果必须阻塞,使用publishOn切换到适当的调度器
  2. 合理选择调度器

    • 计算密集型:parallel
    • I/O密集型:boundedElastic
    • 事件循环:通常不需要显式调度
  3. 注意上下文传播

    • Reactor Context可以携带跨线程的信息
    • 使用contextWritedeferContextual管理上下文
  4. 资源清理

    • 对于自定义调度器或长时间运行的应用程序,注意关闭调度器
    Scheduler scheduler = Schedulers.newBoundedElastic(10, 100, "custom");
    try {Flux.just(1, 2, 3).publishOn(scheduler).subscribe(System.out::println);
    } finally {scheduler.dispose();
    }
    
实际应用示例
@RestController
public class DataController {private final DataService dataService;public DataController(DataService dataService) {this.dataService = dataService;}@GetMapping("/data/{id}")public Mono<Data> getData(@PathVariable String id) {// 使用boundedElastic处理潜在的阻塞操作return Mono.fromCallable(() -> dataService.blockingGetData(id)).subscribeOn(Schedulers.boundedElastic()).timeout(Duration.ofSeconds(2)).onErrorResume(e -> Mono.just(Data.fallbackData()));}@GetMapping("/stream")public Flux<Data> streamData() {// 使用parallel处理计算密集型流return dataService.dataStream().publishOn(Schedulers.parallel()).map(data -> {// 计算密集型转换return data.withProcessedPayload(processPayload(data.getPayload()));}).log();}private String processPayload(String payload) {// 模拟计算密集型处理return payload.toUpperCase();}
}

总结

在这里插入图片描述

Project Reactor为Java响应式编程提供了强大的工具集:

  1. Mono:处理0-1个结果的异步操作,适合单个值或空结果的场景
  2. Flux:处理0-N个结果的异步序列,适合流式数据处理
  3. Schedulers:管理执行上下文,优化线程使用和资源分配

通过合理组合这些组件,开发者可以构建高效、可扩展的响应式应用程序,充分利用现代硬件资源,同时保持代码的清晰和可维护性。响应式编程模型特别适合高并发、低延迟的应用场景,如微服务架构、实时数据处理和事件驱动系统。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com