您的位置:首页 > 健康 > 美食 > 教务管理系统下载_武汉网页制作速成班_广州seo营销培训_培训心得体会感悟

教务管理系统下载_武汉网页制作速成班_广州seo营销培训_培训心得体会感悟

2025/1/7 9:52:07 来源:https://blog.csdn.net/weixin_43889578/article/details/137243321  浏览:    关键词:教务管理系统下载_武汉网页制作速成班_广州seo营销培训_培训心得体会感悟
教务管理系统下载_武汉网页制作速成班_广州seo营销培训_培训心得体会感悟

【深入理解SpringCloud微服务】了解微服务的熔断、限流、降级,手写实现一个微服务熔断限流器

  • 服务雪崩
  • 熔断、限流、降级
    • 熔断
    • 降级
    • 限流
  • 手写实现一个微服务熔断限流器
    • 架构设计
    • 代码实现
      • 整体逻辑
      • ProtectorAspect#aroundMethod(ProceedingJoinPoint)具体实现
        • 1、获取接口对应的类名、方法名、属性类型
        • 2、如果接口启用了限流器,调用限流器进行验证是否需要限流
        • 3、如果接口启用了断路器,检查断路器状态决定是否继续往下执行
        • 4、执行目标方法,并记录断路器(成功/失败)
      • 降级回调

服务雪崩

服务雪崩时分布式系统中会遇到的一个问题,由于分布式系统存在服务间调用的关系,一个服务不可用,往往会影响到调用该服务的另外一个服务。

在分布式系统中,一个服务由于程序bug或流量过大导致不可用,进而导致调用该服务的另一个服务也不可用,然后这种不可用继续影响上游其他服务导致其不可用,这种不可用的情况不断放大的过程,就是服务雪崩。

在这里插入图片描述

而应对这种服务雪崩问题的解决方案就是熔断、限流、降级。

熔断、限流、降级

熔断

熔断的作用类似于电路中的保险丝。电路中正确安置保险丝,保险丝就会在电流异常升高到一定的高度和热度的时候,自身熔断切断电流,保护了电路安全运行。

熔断通过增加一个断路器实现。

在这里插入图片描述

断路器一般有三个状态:关闭、打开、半开。

  • 关闭状态:如同一个开关处于闭合状态,此时接口可以执行正常的逻辑处理。
  • 打开状态:如果一个开关处于打开状态,此时接口的所有请求都会被降级处理。
  • 半开状态:处于半开状态的断路器,允许一个请求执行正常的逻辑处理,这相当于是试探性的调用一下,如果能正常返回结果,那么断路器恢复为闭合状态,否则变为打开状态。

断路器的三种状态间的转换如下图:

在这里插入图片描述

当断路器处于打开状态时,由于接口的正常逻辑无法执行,此时就是进行降级处理。

降级

降级是指原有的逻辑无法正常执行,转而执行的一段备用的有损的逻辑。

比如服务A的接口调用服务B的接口发生异常或超时,那么服务A的接口进行降级逻辑,调用本地的一个方法,返回默认的数据。

在这里插入图片描述

除此以外,断路器打开,那么也会执行降级逻辑,而不会走正常逻辑。

在这里插入图片描述

限流

限流也叫流控,也就是流量控制,作用是限制流入服务接口的流量大小。每一个服务或接口能扛住的流量大小都是有上限的,如果超过了这个上限,就有可能造成性能下降或服务不可用。

限流通过在服务或接口前添加限流器实现。如果流量没有超过限流器的阈值,那么请求可以正常通过;如果流量超过限流器的阈值,那么超过阈值的这一部分请求就要被拦截掉。

在这里插入图片描述

被流控的请求,可以报错返回错误信息,也可以走降级处理的逻辑。

在这里插入图片描述

限流器的限流算法包括:

  • 简单时间窗算法
  • 滑动时间窗算法
  • 漏桶算法
  • 令牌桶算法

这些算法在下面会用代码进行解析,这里先跳过。

手写实现一个微服务熔断限流器

架构设计

我们设计的熔断限流框架包括的组件有:

  • FlowLimiter:限流器
  • Breaker:断路器
  • Fallback:降级逻辑

在这里插入图片描述

当我们的系统接进来一个请求时,先经过FlowLimiter(限流器)处理。如果FlowLimiter的当前流量还没有达到限流阈值,请求正常往下走;如果FlowLimiter的当前流量已经达到了限流阈值,那么就转而走Fallback(降级逻辑)进行降级处理。

如果一个请求成功通过了FlowLimiter后,就会到Breaker(断路器)的处理。断路器判断自身当前状态,如果是关闭状态或半开状态,请求都可以正常通过;如果断路器状态是打开状态,那么请求走Fallback(降级逻辑)进行降级处理。

如果一个请求成功通过了Breaker后,就会执行目标方法。如果目标方法正常执行,那么返回正常处理结果;如果目标方法执行异常或超时,那么请求走Fallback(降级逻辑)进行降级处理。

在这里插入图片描述

总体上,我们的框架是通过AOP给目标方法生成动态代理对象,在AOP的增强逻辑中实现的熔断、限流、降级等功能。

代码实现

整体逻辑

首先我们定义一个注解。

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Protected {...
}

@Protected注解用于修饰指定接口或方法,表示这个接口或方法已经被我们的框架保护,具有熔断、限流、降级等效果。

然后需要定义一个切面类。

/*** @author huangjunyi* @date 2023/12/19 19:24* @desc*/
@Aspect
public class ProtectorAspect implements PriorityOrdered {...@Pointcut("@annotation(com.huangjunyi1993.simple.microservice.protector.aop.Protected)")public void pointCut() {}@Around(value = "pointCut()")public Object aroundMethod(ProceedingJoinPoint joinPoint) throws IllegalAccessException, InstantiationException, ClassNotFoundException {...}...}

我们通过SpringAOP扫描被@Protected注解修饰的方法,给它们生成代理对象,并且通过SpringAOP的@Around注解定义了环绕增强逻辑,当请求被@Protected注解修饰的接口方法接收时,就会进入这里的增强逻辑,增强逻辑做的自然就是熔断、限流、降级等这些事情。

在这里插入图片描述

而使用的时候,只要在对应接口方法上添加@Protected注解,并指定对应的属性即可(@Protected注解中的各属性会在后面讲解)

    @GetMapping("/flow/simple")@Protected(name = "testFlowSimple", enableFlowLimiter = true, flowLimiterName = FlowLimiterNameConstant.SIMPLE_TIME_WINDOW, limit = 2) // 测试限流效果public Map<String, Object> testFlowSimple() {...}

ProtectorAspect#aroundMethod(ProceedingJoinPoint)整体流程:

    @Around(value = "pointCut()")public Object aroundMethod(ProceedingJoinPoint joinPoint) throws IllegalAccessException, InstantiationException, ClassNotFoundException {// 1、获取接口对应的类名、方法名、属性类型...// 2、如果接口启用了限流器,调用限流器进行验证是否需要限流if (!annotation.enableFlowLimiter() || checkCanPass(annotation, name)) {...// 3、如果接口启用了断路器,检查断路器整体决定是否继续往下执行// 如果断路器打开,执行fallback逻辑if (annotation.enableBreaker()) {...}// 4、执行目标方法,并记录断路器(成功/失败),// 如果超时或异常,执行fallback逻辑}// 被流控了,执行fallback逻辑}

在这里插入图片描述

下面就对ProtectorAspect#aroundMethod(ProceedingJoinPoint)方法中的具体实现进行讲解。

ProtectorAspect#aroundMethod(ProceedingJoinPoint)具体实现

1、获取接口对应的类名、方法名、属性类型
    @Around(value = "pointCut()")public Object aroundMethod(ProceedingJoinPoint joinPoint) throws IllegalAccessException, InstantiationException, ClassNotFoundException {// 1、获取接口对应的类名、方法名、属性类型MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();// 类名String className = methodSignature.getDeclaringTypeName();// 方法名String methodName = methodSignature.getMethod().getName();// 参数类型列表Class[] parameterTypes = methodSignature.getParameterTypes();// @Protected注解Protected annotation = methodSignature.getMethod().getAnnotation(Protected.class);// 取得@Protected注解的name属性,如果没有,则生成一个String name = StringUtils.isNotBlank(annotation.name()) ? annotation.name() : EntryPointSymbolGenerator.get(className, methodName, parameterTypes);// 如果@Protected注解指定了Fallback实现类全限定名,生成并缓存该Fallback实例Fallback fallback = null;if (StringUtils.isNotBlank(annotation.fallback())) {fallback = FallbackCacheUtil.getInstance(annotation.fallback());}// 2、如果接口启用了限流器,调用限流器进行验证是否需要限流if (!annotation.enableFlowLimiter() || checkCanPass(annotation, name)) {...// 3、如果接口启用了断路器,检查断路器状态决定是否继续往下执行// 如果断路器打开,执行fallback逻辑if (annotation.enableBreaker()) {...}// 4、执行目标方法,并记录断路器(成功/失败),// 如果超时或异常,执行fallback逻辑}// 被流控了,执行fallback逻辑}

第一步没做什么特别的事情,就是获取类名className、方法名methodName、参数类型列表parameterTypes。

除此以外获取@Protected注解的name属性,下面会用这个name去生成限流器FlowLimiter和断路器Breaker,不同的FlowLimiter和Breaker之间,就是通过name区分。

然后还会判断一下@Protected注解是否声明了当前接口方法对应的Fallback实现类全限定名,也就是@Protected注解的fallback属性是否不为空,如果不为空,那么生成并缓存对应的Fallback实例。

在这里插入图片描述
FallbackCacheUtil.getInstance(annotation.fallback())生成并缓存Fallback实现类实例的逻辑:

/*** @author huangjunyi* @date 2023/12/27 15:26* @desc*/
public class FallbackCacheUtil {private static final Map<String, Fallback> CALLBACK_CACHE_MAP = new HashMap<>();public static Fallback getInstance(String fallbackClassName) throws ClassNotFoundException, IllegalAccessException, InstantiationException {// 缓存Map中有,则从缓存中返回if (CALLBACK_CACHE_MAP.containsKey(fallbackClassName)) {return CALLBACK_CACHE_MAP.get(fallbackClassName);}// 反射创建并放入缓存MapFallback callback = (Fallback) Class.forName(fallbackClassName).newInstance();CALLBACK_CACHE_MAP.put(fallbackClassName, callback);return callback;}}

在这里插入图片描述

2、如果接口启用了限流器,调用限流器进行验证是否需要限流

然后接下来轮到限流器出场,就是这个if判断:

        // 2、如果接口启用了限流器,调用限流器进行验证是否需要限流if (!annotation.enableFlowLimiter() || checkCanPass(annotation, name)) {...}

限流器的所有逻辑都在这个if判断中。

首先是判断@Protected注解的enableFlowLimiter属性是否为true,如果为true,表示启用限流器,才会有后面的checkCanPass(annotation, name)判断;如果enableFlowLimiter属性为false,表示不启用限流器,那么就直接通过了,也就是直接进入if分支内进行下一步处理。

在这里插入图片描述

    private boolean checkCanPass(Protected annotation, String name) {// 根据name通过限流器工厂获取对应的限流器FlowLimiter flowLimiter = flowLimiterFactory.get(name);if (flowLimiter == null) {// 如果限流去工厂没有缓存的限流器,调用限流器工厂创建一个并缓存flowLimiter = flowLimiterFactory.create(annotation);}// 调用限流器的canPass方法进行验证return flowLimiter.canPass(name);}

上面的flowLimiterFactory是FlowLimiterFactory类型,FlowLimiterFactory是一个限流器工厂接口。

/*** @author huangjunyi* @date 2024/1/2 19:39* @desc*/
public interface FlowLimiterFactory {/*** 根据资源名称获取限流器* @param name 资源名称 {@link com.huangjunyi1993.simple.microservice.protector.aop.Protected#name()}* @return 限流器*/FlowLimiter get(String name);/*** 根据注解创建指定的限流器* @param annotation Protected注解 {@link com.huangjunyi1993.simple.microservice.protector.aop.Protected}* @return 限流器*/FlowLimiter create(Protected annotation);}

用户可以实现自定义的FlowLimiterFactory,也可以使用默认的FlowLimiterFactory实现类。默认的FlowLimiterFactory是CacheFlowLimiterFactory,但CacheFlowLimiterFactory不是直接实现的FlowLimiterFactory接口,而是继承了SPIFlowLimiterFactory,SPIFlowLimiterFactory才实现了FlowLimiterFactory接口。

SPIFlowLimiterFactory通过Java的SPI机制创建FlowLimiter实例:

public abstract class SPIFlowLimiterFactory implements FlowLimiterFactory {@Overridepublic FlowLimiter create(Protected annotation) {// 通过ServiceLoader加载所有FlowLimiterBuilderServiceLoader<FlowLimiterBuilder> flowLimiterBuilders = ServiceLoader.load(FlowLimiterBuilder.class);for (FlowLimiterBuilder builder : flowLimiterBuilders) {// 获取FlowLimiterBuilder上修饰的@FlowLimiterName注解,// 看是否与@Protected中的flowLimiterName属性匹配,// 如果匹配,调用FlowLimiterBuilder的build方法创建if (StringUtils.equals(builder.getClass().getAnnotation(FlowLimiterName.class).name(), annotation.flowLimiterName())) {return builder.build(annotation);}}throw new ProtectorException("Create flowLimiter failed");}}

比如@Protected注解声明了属性flowLimiterName = FlowLimiterNameConstant.SIMPLE_TIME_WINDOW,那么就匹配到简单时间窗算法的FlowLimiterBuilder,它会创建一个SimpleTimeWindowFlowLimiter(简单时间窗限流器)

/*** @author huangjunyi* @date 2024/1/3 19:34* @desc*/
@FlowLimiterName(name = SIMPLE_TIME_WINDOW)
public class SimpleTimeWindowFlowLimiterBuilder implements FlowLimiterBuilder {@Overridepublic FlowLimiter build(Protected annotation) {if (annotation.limit() <= 0) {throw new ProtectorException("Protected annotation parameter invalid");}return new SimpleTimeWindowFlowLimiter(annotation.limit());}
}

SPIFlowLimiterFactory是抽象类,CacheFlowLimiterFactory继承了SPIFlowLimiterFactory,添加了缓存已创建的FlowLimiter的逻辑,避免重复创建。

/*** @author huangjunyi* @date 2024/1/2 19:43* @desc*/
public class CacheFlowLimiterFactory extends SPIFlowLimiterFactory{private static final Map<String, FlowLimiter> FLOW_LIMITER_MAP = new ConcurrentHashMap<>();@Overridepublic FlowLimiter get(String name) {// 直接从map缓存中取return FLOW_LIMITER_MAP.get(name);}@Overridepublic FlowLimiter create(Protected annotation) {// 调用父类SPIFlowLimiterFactory进行创建,然后缓存到map中FlowLimiter flowLimiter = super.create(annotation);FLOW_LIMITER_MAP.put(annotation.name(), flowLimiter);return flowLimiter;}
}

在这里插入图片描述

用户可以实现自己的FlowLimiter和FlowLimiterBuilder,只要FlowLimiterBuilder上的@FlowLimiterName注解与接口方法上的@Protected注解声明的属性flowLimiterName匹配,自定义的FlowLimiter就会生效。

然后flowLimiter.canPass(name)就是调用断路器FlowLimiter的检查方法,检查当前请求是否需要被限流。canPass方法返回true,表示请求可通过,无需限流;否则返回false,表示请求不可通过,需要限流。

FlowLimiter也是一个接口,提供了简单时间窗、滑动时间窗、漏桶、令牌桶四种限流算法的FlowLimiter实现,这些算法打算在下一篇文章分析。用户也可以实现自己的FlowLimiter自定义限流流算法。

在这里插入图片描述

3、如果接口启用了断路器,检查断路器状态决定是否继续往下执行
    @Around(value = "pointCut()")public Object aroundMethod(ProceedingJoinPoint joinPoint) throws IllegalAccessException, InstantiationException, ClassNotFoundException {// 1、获取接口对应的类名、方法名、属性类型...// 2、如果接口启用了限流器,调用限流器进行验证是否需要限流if (!annotation.enableFlowLimiter() || checkCanPass(annotation, name)) {...// 3、如果接口启用了断路器,检查断路器状态决定是否继续往下执行// 如果断路器打开,执行fallback逻辑if (annotation.enableBreaker()) {// 从断路器工厂BreakerFactory中获取断路器breaker = breakerFactory.get(name);if (breaker == null) {// 如果没有获取到,创建断路器breaker = createBreaker(breakerFactory, name, annotation);}// 调用断路器进行检验if (!breaker.canPass()) {throw new ProtectorException("breaker has tripped");}}// 4、执行目标方法,并记录断路器(成功/失败),// 如果超时或异常,执行fallback逻辑}// 被流控了,执行fallback逻辑}private Breaker createBreaker(BreakerFactory breakerFactory, String name, Protected annotation) {// 调用断路器工厂创建断路器并缓存// @Protected的timeSpan()属性表示统计失败率的时间跨度// @Protected的maxFailedNum()属性表示断路器最大容忍失败数// @Protected的openTime()属性表示断路器打开时的持续时间return breakerFactory.create(name, annotation.timeSpan(), annotation.maxFailedNum(), annotation.openTime());}

断路器创建的流程基本跟限流器一样,先看看BreakerFactoryd的get方法有没有,没有就调工厂的create方法创建。

在这里插入图片描述

BreakerFactory是一个接口:

public interface BreakerFactory {/*** 获取断路器* @param name 资源名称 {@link com.huangjunyi1993.simple.microservice.protector.aop.Protected#name()}* @return 断路器*/Breaker get(String name);/*** 创建断路器* @param name 资源名称 {@link com.huangjunyi1993.simple.microservice.protector.aop.Protected#name()}* @param timeSpan 时间跨度 {@link com.huangjunyi1993.simple.microservice.protector.aop.Protected#timeSpan()}* @param maxFailedNum 时间跨度内最大允许失败数 {@link com.huangjunyi1993.simple.microservice.protector.aop.Protected#maxFailedNum()}* @param openTime 断路器开路时间 {@link com.huangjunyi1993.simple.microservice.protector.aop.Protected#openTime()}* @return 断路器*/Breaker create(String name, int timeSpan, int maxFailedNum, long openTime);}

我们提供的默认实现类是BasicBreakerFactory,当然用户也可以自己实现自定义BreakerFactory。

/*** @author huangjunyi* @date 2023/12/28 11:02* @desc*/
public class BasicBreakerFactory implements BreakerFactory {private static final Map<String, Breaker> BREAKER_MAP = new ConcurrentHashMap<>();@Overridepublic  Breaker get(String name) {// 从缓存map中取return BREAKER_MAP.get(name);}@Overridepublic Breaker create(String name, int timeSpan, int maxFailedNum, long openTime) {// BasicBreakerFactory默认创建的就是BasicBreakerBreaker breaker = new BasicBreaker(timeSpan, maxFailedNum, openTime);// 放入缓存BREAKER_MAP.put(name, breaker);return breaker;}}

在这里插入图片描述

BasicBreaker中的断路器算法,也是放在后面的文章分析。

4、执行目标方法,并记录断路器(成功/失败)
```java@Around(value = "pointCut()")public Object aroundMethod(ProceedingJoinPoint joinPoint) throws IllegalAccessException, InstantiationException, ClassNotFoundException {// 1、获取接口对应的类名、方法名、属性类型...// 2、如果接口启用了限流器,调用限流器进行验证是否需要限流if (!annotation.enableFlowLimiter() || checkCanPass(annotation, name)) {...// 3、如果接口启用了断路器,检查断路器状态决定是否继续往下执行// 如果断路器打开,执行fallback逻辑if (annotation.enableBreaker()) {...}// 4、执行目标方法,并记录断路器(成功/失败),// 如果超时或异常,执行fallback逻辑if (annotation.enableTimeout() && annotation.timeout() > 0) {Fallback finalFallback = fallback;try {Breaker finalBreaker = breaker;// 如果@Protected中的enableTimeout属性为true,表示开启超时保护机制,// 并且@Protected设置的timeout超时时间大于零// 在CompletableFuture#supplyAsync()中执行// CompletableFuture#get(timeout)阻塞等待timeout指定的事件长度,超过了就不等了,作超时处理return CompletableFuture.supplyAsync(() -> execute(joinPoint, finalFallback, finalBreaker, annotation.enableBreaker())).get(annotation.timeout(), TimeUnit.MILLISECONDS);} catch (TimeoutException e) {LOGGER.error("ProtectorAspect aroundMethod timeout:", e);if (annotation.enableBreaker()) {// 启用了断路器,记录断路器一次失败breaker.failed();}if (finalFallback != null) {// Fallback不为空,执行Fallback的降级方法return finalFallback.process(joinPoint.getArgs());}// Fallback为空,则抛异常throw new ProtectorException(String.format("%s.%s timeout", className, methodName));} catch (InterruptedException|ExecutionException e) {...}}// 接口未开启超时保护,直接执行return execute(joinPoint, fallback, breaker, annotation.enableBreaker());}// 被流控了,执行fallback逻辑}

如果@Protected中的enableTimeout属性为true,并且@Protected设置的timeout超时时间大于零,那么在CompletableFuture的supplyAsync方法中执行,并且get(annotation.timeout(), TimeUnit.MILLISECONDS)等待指定时长。

如果@Protected中的enableTimeout属性为false,或者@Protected设置的timeout超时时间等于零,那么就直接执行。

在这里插入图片描述

    private Object execute(ProceedingJoinPoint joinPoint, Fallback fallback, Breaker breaker, boolean enableBreaker) {try {Object result = doExecute(joinPoint, fallback);if (enableBreaker) {// 启用了断路器,记录一个成功计数breaker.success();}return result;} catch (Throwable throwable) {LOGGER.error("ProtectorAspect aroundMethod error:", throwable);if (enableBreaker) {// 启用了断路器,记录一个失败计数breaker.failed();}if (fallback != null) {// fallback不为null,调用降级逻辑return fallback.process(joinPoint.getArgs());}// fallback为null就抛异常throw new ProtectorException(throwable.getMessage());}}private Object doExecute(ProceedingJoinPoint joinPoint, Fallback fallback) throws Throwable {// 执行目标方法return joinPoint.proceed();}    

execute方法中执行目标方法,如果成功,则记录断路器一个成功计数,如果失败,则计数断路器一个失败计数。

如果执行失败的话(也就是超时或发生异常),如果fallback不为null(如果@Protected注解指定了Fallback实现类全限定名,就不为null),那么会执行Fallback实现类的降级处理方法。

在这里插入图片描述

降级回调

降级处理逻辑抽象成了一个接口Fallback。

/*** @author huangjunyi* @date 2023/12/18 19:51* @desc*/
@FunctionalInterface
public interface Fallback {Object process(Object[] args);}

Fallback的process就是降级处理逻辑,需要用户自定义Fallback实现类并实现process方法中自定义的降级处理逻辑。

当调用一个接口方法时,如果被流控,或者断路器处于打开状态,或者执行目标方法时超时或抛出异常,并且接口设置了降级回调,那么执行对应Fallback实现类的降级处理方法。

    @GetMapping("/fallback")@Protected(name = "fallback", fallback = "com.huangjunyi1993.simple.microservice.protector.example.fallback.TestFallback") // 测试降级效果public Map<String, Object> fallback() throws Exception {throw new Exception("test fallback");}

比如@Protected的fallback属性设置了Fallback的实现类TestFallback的全限定名,那么当执行目标方法时超时或抛出异常,就会执行TestFallback的process(Object[] args)方法。

在这里插入图片描述

代码以提交到gitee,可以自行下载阅读。
https://gitee.com/huang_junyi/simple-microservice/tree/master/simple-microservice-protector
在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com