您的位置:首页 > 汽车 > 新车 > 网站怎样才能在百度被搜索到_遵义页面设计制作_看网站搜索什么关键词_东莞建设企业网站公司

网站怎样才能在百度被搜索到_遵义页面设计制作_看网站搜索什么关键词_东莞建设企业网站公司

2025/1/22 17:41:26 来源:https://blog.csdn.net/qq_30460361/article/details/144812307  浏览:    关键词:网站怎样才能在百度被搜索到_遵义页面设计制作_看网站搜索什么关键词_东莞建设企业网站公司
网站怎样才能在百度被搜索到_遵义页面设计制作_看网站搜索什么关键词_东莞建设企业网站公司

注意: 本文内容于 2024-12-28 21:22:12 创建,可能不会在此平台上进行更新。如果您希望查看最新版本或更多相关内容,请访问原文地址:ReactiveStreams、Reactor、SpringWebFlux。感谢您的关注与支持!

ReactiveStreams是一个处理异步流的规范,定义了Publisher、Subscriber、Subscription、Processor接口。

Reactor是ReactiveStreams的实现,对于Publisher提供了两个核心实现——Mono与Flux。

SpringWebFlux是构建在Reactor之上的响应式Web框架。

本文源码

一、Reactive Streams

Reactive Streams 是一个用于处理异步流数据的标准规范,特别适合处理非阻塞背压控制的场景。

所谓的背压控制,是指在异步数据流中,消费者根据自身的能力向生产者获取数据进行消费,以避免数据积压导致系统过载或者崩溃。

TCP中的拥塞控制,也可以看作是背压控制的一种实现。

1.1 API规范

Reactive Streams 的四大API接口如下

  1. org.reactivestreams.Publisher: 发布者接口,提供数据流。
    • void subscribe(Subscriber<? super T> subscriber)
  2. org.reactivestreams.Subscriber: 订阅者接口,接收数据流。
    • void onSubscribe(Subscription subscription)
    • void onNext(T item)
    • void onError(Throwable throwable)
    • void onComplete()
  3. org.reactivestreams.Subscription: 订阅关系接口,提供控制机制。
    • void request(long n)
    • void cancel()
  4. org.reactivestreams.Processor: 继承Publisher和Subscriber的接口。

简单绘制一个时序图,加深对整个链路的理解。

使用Publisher、Subscriber、Subscription实现一个简单的订阅功能,示例如下

以下代码,并没有异步相关的内容。只是为了学习整个API流转链路。

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;public class Example01 {private static final Logger log = LoggerFactory.getLogger(Example01.class);/*** 订阅关系*/public static Subscription getSubscription(Subscriber<? super String> subscriber, String... items) {return new Subscription() {private final AtomicBoolean canceled = new AtomicBoolean(false);private final AtomicInteger sendItems = new AtomicInteger(0);/*** request数据* 内部onNext会request后面的数据,而onComplete应该要等所有的数据消费完毕后,才会执行。* 故需要加锁保证线程安全,此处采取CAS。源码参考reactor.core.publisher.Operators.ScalarSubscription#request(long)*/@Overridepublic void request(long n) {if (n > 0) {if (canceled.get()) {return;}if (sendItems.get() >= items.length) {subscriber.onComplete();} else {subscriber.onNext(items[sendItems.getAndIncrement()]);}}}@Overridepublic void cancel() {canceled.compareAndSet(true, true);}};}/*** 发布者*/private static Publisher<String> getPublisher(String... items) {return new Publisher<String>() {@Overridepublic void subscribe(Subscriber<? super String> subscriber) {subscriber.onSubscribe(getSubscription(subscriber, items));}};}/*** 订阅者*/private static Subscriber<String> getSubscriber() {return new Subscriber<String>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription s) {this.subscription = s;log.info("Subscribed to {}", s);// 请求第一个元素subscription.request(1);}@Overridepublic void onNext(String s) {log.info("Received {}", s);// 请求下一个元素subscription.request(1);}@Overridepublic void onError(Throwable t) {log.error("Error occurred", t);}@Overridepublic void onComplete() {log.info("All items received");}};}public static void main(String[] args) {// 订阅Flux// Flux.just("first", "second", "third").delayElements(Duration.ofSeconds(2))//         .subscribe(getSubscriber());/*** org.reactivestreams.Publisher: 发布者* org.reactivestreams.Subscriber: 订阅者* org.reactivestreams.Subscription: 发布者和订阅者之间的桥梁,数据流控制的核心机制。*/// 订阅自定义PublishergetPublisher("first", "second", "third", "fourth", "fifth").subscribe(getSubscriber());while (true) {}}
}

运行结果

1.2 API实现库

Reactive Streams实现如下

  • Java9+ java.util.concurrent.Flow
  • RxJava: Reactive Extension Java
  • Reactor: Reactor Framework

Java9+中提供了java.util.concurrent.Flow,在标准库中提供ReactiveStreams规范的接口。

ReactiveStreams内部也提供了适配JDK中Flow的适配器org.reactivestreams.FlowAdapters

RxJava以及Reactor,分别用于Java开发中不同领域。RxJava一般用于Android开发,Reactor一般用于Spring开发。

二、Reactor

Reactor提供了两个核心类

  1. reactor.core.publisher.Flux:发布0或N个元素的异步数据流
  2. reactor.core.publisher.Mono:发布0或1个元素的异步数据流

这两者都是Publisher,主要区别在于发送数据的数量。因此在使用上,相关的API都是差不多的。

2.1 Mono

Mono中的静态方法,用于创建Mono实例。

Mono实例中的成员方法如下

方法名说明
and合并多个 Mono 实例,等所有 Mono 完成后返回一个新的 Mono
as用指定的类型包裹当前 Mono,通常用于类型转换。
block阻塞并获取 Mono 的结果,直到执行完成。
blockOptional类似于 block,但返回 Optional 包裹的结果。
cache缓存当前 Mono 的值,使得未来的订阅者可以共享相同的结果。
cacheInvalidateIf缓存失效条件满足时重新缓存,适用于动态失效策略。
cacheInvalidateWhen在指定条件下使缓存失效。
cancelOn当给定的 Publisher 发出信号时,取消当前 Mono
cast强制类型转换为指定的类型。
checkpoint在流的执行过程中插入检查点,用于调试。
concatWith与另一个 MonoFlux 连接,按顺序执行。
contextWrite修改 Mono 的上下文。
defaultIfEmpty如果 Mono 为空,返回默认值。
delayElement延迟发出元素的时间。
delaySubscription延迟订阅,等到指定的时间或事件发生才开始订阅。
delayUntil延迟直到指定的 Publisher 发出信号时才开始执行。
dematerialize将一个包含 SignalMono 转换为原始值的 Mono
doAfterSuccessOrError在执行成功或出错后执行的操作。
doAfterTerminateMono 结束时执行的操作,不论成功或失败。
doFinallyMono 完成时执行的最终操作。
doFirstMono 执行前执行的操作。
doOnCancel当订阅者取消时执行的操作。
doOnDiscard当元素被丢弃时执行的操作。
doOnEach对每个发出的信号执行操作。
doOnError当发生错误时执行的操作。
doOnNext每次元素发出时执行的操作。
doOnRequest在请求信号到达时执行的操作。
doOnSubscribe在订阅时执行的操作。
doOnSuccess当成功完成时执行的操作。
doOnSuccessOrError无论成功还是失败,都执行的操作。
doOnTerminate在终止时执行的操作。
elapsed返回每个信号的时间戳。
expand展开 Mono,生成新的 Mono,直到满足某个条件。
expandDeep深度展开 Mono,通常递归调用直到满足条件。
filter过滤元素,只有符合条件的元素才会发出。
filterWhen使用 Publisher 的元素条件来过滤当前 Mono
flatMap转换元素,返回新的 MonoFlux
flatMapIterable将每个元素转换为一个可迭代的元素。
flatMapMany将元素转换为 Flux
fluxMono 转换为 Flux
handle基于元素的条件来决定如何处理流。
hasElement判断是否包含元素。
hide隐藏 Mono 的实现细节,返回一个不可观察的 Mono
ignoreElement忽略元素,只关心是否完成。
log记录 Mono 中的信号,便于调试。
map将元素映射为另一个元素。
mapNotNull映射并排除空值。
materialize将信号转化为一个 Signal 对象。
mergeWith合并当前 Mono 和另一个 Mono
metrics获取流的度量信息。
nameMono 设置名称,用于调试和监控。
ofType根据类型过滤信号。
onErrorContinue在发生错误时继续执行。
onErrorMap将错误映射为其他类型。
onErrorResume在发生错误时恢复操作。
onErrorReturn在发生错误时返回默认值。
onErrorStop在发生错误时终止流。
onTerminateDetach在终止时解除与订阅者的连接。
or连接另一个 Mono,如果当前 Mono 没有值或为空时执行。
publish启动 Mono 并返回一个共享的流。
publishOn指定在哪个线程调度上下文中执行 Mono
repeat重复执行 Mono,直到满足某个条件。
repeatWhen基于另一个 Publisher 的信号来控制重复。
repeatWhenEmptyMono 为空时重复执行。
retry在发生错误时重试操作。
retryWhen基于另一个 Publisher 来控制重试。
share共享执行的结果,避免重复执行。
single获取 Mono 中唯一的元素。
subscribe启动流的执行并订阅。
subscribeOn指定在哪个线程调度上下文中订阅 Mono
subscribeWith通过指定的 Subscriber 订阅 Mono
subscriberContext获取或修改订阅时的上下文。
switchIfEmpty如果 Mono 为空,则切换到另一个 Mono
tagMono 打上标签,用于调试和日志。
take限制只获取前 N 个元素。
takeUntilOther当另一个 Publisher 发出信号时停止当前 Mono
then在当前 Mono 执行完后执行另一个操作。
thenEmpty在当前 Mono 执行完后返回一个空的 Mono
thenMany在当前 Mono 执行完后返回一个 Flux
thenReturn在当前 Mono 执行完后返回指定的值。
timed返回元素和其时间戳。
timeout如果 Mono 在指定时间内没有发出信号,则触发超时。
timestamp返回元素及其时间戳。
toFutureMono 转换为 Future
toProcessorMono 转换为 Processor,适用于与 Flux 的结合。
toString返回 Mono 的字符串表示。
transform使用转换函数修改 Mono
transformDeferred延迟转换,直到订阅发生。
transformDeferredContextual延迟转换并访问上下文。
zipWhen与另一个 Mono 的信号配对,形成 Mono 的组合。
zipWith与另一个 Mono 的信号进行合并,形成 Mono 的组合。

2.2 Flux

Flux中的静态方法,用于创建Flux实例。

Flux实例中的成员方法如下

方法名说明
all判断 Flux 中的所有元素是否满足给定条件。
any判断 Flux 中是否有任何一个元素满足给定条件。
asFlux 转换为指定类型的 Publisher
blockFirst阻塞并返回 Flux 中的第一个元素。
blockLast阻塞并返回 Flux 中的最后一个元素。
bufferFlux 中的元素分成固定大小的缓冲区。
bufferTimeout按照指定的时间或缓冲区大小将元素分块。
bufferUntil在满足某个条件时开始一个新的缓冲区。
bufferUntilChanged将相邻相同的元素合并到同一个缓冲区。
bufferWhen根据外部 Publisher 切换缓冲区。
bufferWhile按照指定条件将元素分组为缓冲区。
cache缓存 Flux 的值,使得未来的订阅者可以共享相同的结果。
cancelOn当另一个 Publisher 发出信号时取消当前的 Flux
castFlux 强制转换为指定的类型。
checkpoint在执行流中插入检查点,用于调试和分析。
collect收集流中的元素,按给定规则生成结果。
collectList收集 Flux 中的所有元素并返回一个 List
collectMapFlux 中的元素收集为一个 Map
collectMultimapFlux 中的元素收集为一个多值 Map
collectSortedListFlux 中的元素收集为排序的 List
concatMap将元素转换为 Mono,按顺序处理。
concatMapDelayErrorconcatMap 类似,但在错误发生时延迟处理。
concatMapIterable将每个元素转换为可迭代的元素,并按顺序合并。
concatWith与另一个 Flux 连接,按顺序执行。
concatWithValues连接多个值作为新的 Flux
contextWrite修改 Flux 的上下文。
count统计 Flux 中元素的数量。
defaultIfEmpty如果 Flux 为空,则返回默认值。
delayElements延迟元素的发出。
delaySequence延迟整个序列的发出。
delaySubscription延迟订阅,直到指定的时间或事件发生。
delayUntil延迟直到另一个 Publisher 发出信号。
dematerialize将一个包含 SignalFlux 转换为原始元素的 Flux
distinct过滤掉重复的元素,保持唯一性。
distinctUntilChanged过滤掉相邻重复的元素。
doAfterTerminateFlux 完成后执行的操作。
doFinallyFlux 终止时执行的操作。
doFirstFlux 执行前执行的操作。
doOnCancelFlux 被取消时执行的操作。
doOnCompleteFlux 完成时执行的操作。
doOnDiscard在元素被丢弃时执行的操作。
doOnEachFlux 发出的每个元素执行操作。
doOnError在发生错误时执行的操作。
doOnNext每次 Flux 发出元素时执行的操作。
doOnRequest在请求信号到达时执行的操作。
doOnSubscribe在订阅时执行的操作。
doOnTerminateFlux 终止时执行的操作。
elapsed获取每个元素的时间戳和持续时间。
elementAt获取指定索引处的元素。
expand对每个元素进行展开,生成新的元素流。
expandDeep深度展开 Flux,通常递归展开元素。
filter过滤出符合条件的元素。
filterWhen使用外部 Publisher 的信号过滤 Flux 中的元素。
flatMap将元素转换为 Flux,并合并其发出的所有元素。
flatMapDelayError在发生错误时延迟元素的转换。
flatMapIterable将元素转换为可迭代的 Flux
flatMapSequential顺序地将元素转换为 Flux
flatMapSequentialDelayError顺序转换,并在发生错误时延迟。
getPrefetch获取 Flux 的预取量。
groupBy将元素按指定的键分组。
groupJoin类似 groupBy,但用于联接多个流。
handle根据元素的条件进行流的处理。
hasElement判断 Flux 中是否包含某个元素。
hasElements判断 Flux 中是否包含多个元素。
hide隐藏 Flux 的实现细节,返回不可观察的流。
ignoreElements忽略 Flux 中的所有元素,只关心终止信号。
index返回元素在流中的索引。
join将多个 Flux 中的元素合并为一个字符串。
last获取 Flux 中的最后一个元素。
limitRate限制从流中请求的元素数量。
limitRequest限制从流中请求的最大元素数量。
log记录流中的元素,用于调试。
map将元素映射为新的类型。
mapNotNull映射并排除空值。
materialize将信号转换为 Signal 对象。
mergeComparingWith将两个 Flux 合并并根据比较条件排序。
mergeOrderedWith将两个有序的 Flux 合并。
mergeWith合并当前 Flux 和另一个 Flux
metrics获取流的度量信息。
nameFlux 设置名称,便于调试。
next获取 Flux 中的下一个元素。
ofType根据类型过滤信号。
onBackpressureBuffer在背压时缓存元素。
onBackpressureDrop在背压时丢弃元素。
onBackpressureError在背压时触发错误。
onBackpressureLatest在背压时保留最新的元素。
onErrorContinue在发生错误时继续执行。
onErrorMap在错误时将其映射为其他类型。
onErrorResume在错误时恢复操作。
onErrorReturn在错误时返回默认值。
onErrorStop在错误时终止流。
onTerminateDetach在终止时分离与订阅者的连接。
or连接另一个 Flux,如果当前 Flux 为空时执行。
parallelFlux 分发到多个线程进行并行处理。
publish启动 Flux 并返回一个共享流。
publishNext在流的每个元素发出时开始新的发布。
publishOn指定在哪个线程调度上下文中执行流。
reduce将流中的所有元素合并为单一值。
reduceWith使用指定初始值对元素进行合并。
repeat重复执行 Flux 直到满足某个条件。
repeatWhen基于另一个 Publisher 的信号来控制重复。
replay缓存并重播流中的元素。
retry在发生错误时重试操作。
retryWhen基于另一个 Publisher 来控制重试。
sample每隔指定时间间隔取一个元素。
sampleFirst获取流中的第一个元素。
sampleTimeout超过指定时间间隔时触发超时操作。
scan对流中的元素执行累加操作。
scanWith使用给定的初始值对元素执行累加操作。
share共享流的执行,避免重复执行。
shareNext将下一个发出的元素共享给多个订阅者。
single获取 Flux 中唯一的元素。
singleOrEmpty获取 Flux 中唯一的元素,如果为空返回空。
skip跳过流中的前 N 个元素。
skipLast跳过流中的最后 N 个元素。
skipUntil跳过直到满足某个条件的元素。
skipUntilOther跳过直到另一个 Flux 发出信号时的元素。
skipWhile跳过直到满足条件的元素。
sort对流中的元素进行排序。
startWith在流的开始处添加额外元素。
subscribe订阅并启动 Flux
subscribeOn指定在哪个线程调度上下文中订阅流。
subscribeWith通过指定的 Subscriber 订阅流。
subscriberContext获取或修改订阅时的上下文。
switchIfEmpty如果 Flux 为空,则切换到另一个 Flux
switchMap将元素转换为另一个 Flux 并切换执行。
switchOnFirst在流开始时选择一个 Flux 进行切换。
tagFlux 打标签,便于调试和日志。
take限制只获取前 N 个元素。
takeLast获取流中的最后 N 个元素。
takeUntil获取直到满足条件为止的元素。
takeUntilOther获取直到另一个 Flux 发出信号时的元素。
takeWhile获取满足条件的元素,直到条件不满足为止。
then在当前流完成后执行另一个操作。
thenEmpty在当前流完成后返回一个空流。
thenMany在当前流完成后返回另一个 Flux
timed返回每个元素的时间戳和持续时间。
timeout如果 Flux 在指定时间

三、SpringWebFlux

3.1 WebHandler与WebFilter

在SpringMVC中,有Servlet、Filter。

在SpringWebFlux中,有WebHandler、WebFilter,对标的其实就是Servlet API中的Servlet、Filter。甚至执行链也是相似的设计。

Servlet相关知识阅读Servlet - 言成言成啊

Filter相关知识阅读Filter和Listener - 言成言成啊

WebFilter的注册如下

@Bean
@Order(0) // 值越小,优先级越高
@ConditionalOnProperty(name = "allowAllCors.learnFilter", havingValue = "true")
public WebFilter aFilter() {/*** 在servlet中。请求的扭转是 aFilter-->bFilter-->servlet-->bFilter-->aFilter* 在webflux中同理。Filter对应WebFilter,Servlet对应WebHandler*/return (exchange, chain) -> {log.info("aFilter start");return chain.filter(exchange).doOnSuccess(t -> log.info("aFilter end"));};
}

3.2 实际案例

跨域配置

@Bean
@Order(Integer.MIN_VALUE)
@ConditionalOnProperty(name = "allowAllCors.personal", havingValue = "true")
public WebFilter personalCorsFilter(WebSocketHandlerAdapter webFluxWebSocketHandlerAdapter) {WebFilter webFilter = (exchange, chain) -> {ServerHttpRequest request = exchange.getRequest();ServerHttpResponse response = exchange.getResponse();HttpHeaders headers = response.getHeaders();//用*会导致范围过大,浏览器出于安全考虑,在allowCredentials为true时会不认*这个操作,因此可以使用如下代码,间接实现允许跨域headers.set(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN, request.getHeaders().getFirst("origin"));headers.set(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS, "*");headers.set(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS, "*");//允许跨域发送cookieheaders.set(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");if ("OPTIONS".equalsIgnoreCase(request.getMethod().name())) {response.setStatusCode(HttpStatus.OK);return Mono.empty();} else {return chain.filter(exchange);}};log.info("allowAllCors.personal is set to true");return webFilter;}

全局异常拦截/定义响应格式

首先,定义通用响应格式

import lombok.Data;
import reactor.core.publisher.Mono;@Data
public class Resp<T> {private int code;private String msg;private T data;public static <T> Resp<T> ok(T t) {Resp<T> resp = new Resp<>();resp.setCode(0);resp.setMsg("成功");resp.setData(t);return resp;}public static Resp<Void> failure(String msg) {Resp<Void> resp = new Resp<>();resp.setCode(1);resp.setMsg("失败: " + msg);return resp;}public static Resp<Void> error() {Resp<Void> resp = new Resp<>();resp.setCode(500);resp.setMsg("服务器内部错误");return resp;}public static <T> Mono<Resp<T>> getSuccessResp(Mono<T> mono) {return mono.map(Resp::ok);}public static Mono<Resp<Void>> getFailureResp(String msg) {return Mono.just(failure(msg));}public static Mono<Resp<Void>> getErrorResp() {return Mono.just(error());}
}

其次,定义自定义异常DIYException。

最后,配置全局异常拦截。

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import reactor.core.publisher.Mono;
import top.meethigher.utils.Resp;@RestControllerAdvice
@Slf4j
public class GlobalExceptionHandler {@ExceptionHandler(Exception.class)public Mono<Resp<Void>> handleException(Exception e) {log.error("api occurred exception", e);return Resp.getErrorResp();}@ExceptionHandler(DIYException.class)public Mono<Resp<Void>> handleDiyException(DIYException e) {log.error("api occurred exception", e);return Resp.getFailureResp(e.getMessage());}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com