本文属于sentinel学习笔记系列。网上看到吴就业老师的专栏,写的好值得推荐,我整理的有所删减,推荐看原文。
https://blog.csdn.net/baidu_28523317/category_10400605.html
资源指标数据统计
回顾下责任链:Sentinel 学习笔记3-责任链与工作流程-CSDN博客
为了便于理解,在正式看 StatisticSlot代码之前,先看看责任链执行过程。
https://zhuanlan.zhihu.com/p/5675336888
链表中装配的节点对象类型为AbstractLinkedProcessorSlot,也就是图中这些Slot的父类,它声明了两个方法,entry、exit分别作为Slot的入口和出口,并且在方法入参中携带了当前调用链路的上下文信息。请求在进入处理链条后,按先后顺序会经过三类Slot,前置的NodeSelectorSlot、ClusterBuilderSlot用于调用链路的提取,中间StatisticSlot是通用的滑动窗口计数功能,比如RT、通过数、拒绝数、异常数等,后置的ParamFlowSlot、SystemSlot等则为真正的规则验证链条。如下图所示
AbstractLinkedProcessSlot中fireEntry和fireExit,主要是用于在当前节点entry或exit方法执行过程中触发对next节点的调度,这就使得Slot链表触发顺序和完成顺序并不一定相同。每个 ProcessorSlot 都有权决定是先等后续的 ProcessorSlot 执行完成再做自己的事情,还是先完成自己的事情再让后续 ProcessorSlot 执行。比如StatisticSlot 在统计指标数据之前会先调用后续的 ProcessorSlot,根据后续 ProcessorSlot 判断是否需要拒绝该请求的结果决定记录哪些指标数据。
StatisticSlot :
- entry:先调用 fireEntry 方法完成调用后续的 ProcessorSlot#entry 方法,根据后续的 ProcessorSlot 是否抛出 BlockException 决定记录哪些指标数据,并将资源并行占用的线程数加 1。
- exit:若无任何异常,则记录响应成功、请求执行耗时,将资源并行占用的线程数减 1。
entry 方法
正常情况
当后续的 ProcessorSlot 未抛出任何异常时,表示不需要拒绝该请求,放行当前请求。
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {// Do some checking. 先进行后面的slot进行流控/降级的判断fireEntry(context, resourceWrapper, node, count, prioritized, args);// Request passed, add thread count and pass count.node.increaseThreadNum();//并行线程数+1node.addPassRequest(count);//请求数+count(一般是1)// 如果调用来源不为空,也将调用来源的 StatisticNode 的当前并行占用线程数+1,通过请求数+1if (context.getCurEntry().getOriginNode() != null) {// Add count for origin node.context.getCurEntry().getOriginNode().increaseThreadNum();context.getCurEntry().getOriginNode().addPassRequest(count);}//如果流量类型为 IN,增加资源全局唯一的 ClusterNode( 整个集群node)的统计信息if (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseThreadNum();Constants.ENTRY_NODE.addPassRequest(count);}//回调所有 ProcessorSlotEntryCallback#onPass// Handle pass event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onPass(context, resourceWrapper, node, count, args);}}
其中:回调涉及的ProcessorSlotEntryCallback 接口的定义如下
public interface ProcessorSlotEntryCallback<T> {//在请求被放行时被回调执行void onPass(Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args) throws Exception;//在请求被拒绝时被回调执行void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args);
}
异常PriorityWaitException
在需要对请求限流时,只有使用默认流量效果控制器才可能会抛出 PriorityWaitException 异常,说明当前请求已经被休眠了一会了,但请求还是允许通过的,只是不需要为 DefaultNode 记录这个请求的指标数据了,其他的自增,回调还有。
} catch (PriorityWaitException ex) {node.increaseThreadNum();if (context.getCurEntry().getOriginNode() != null) {// Add count for origin node.context.getCurEntry().getOriginNode().increaseThreadNum();}if (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseThreadNum();}// Handle pass event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onPass(context, resourceWrapper, node, count, args);}}
异常BlockException
} catch (BlockException e) {// Blocked, set block exception to current entry.// 没有通过。将异常信息保存到当前的entry里面context.getCurEntry().setBlockError(e);// Add block count. 增加拒绝数量node.increaseBlockQps(count);if (context.getCurEntry().getOriginNode() != null) {//调用来源不为空,让调用来源的 StatisticsNode 也记录当前请求被拒绝context.getCurEntry().getOriginNode().increaseBlockQps(count);}//流量类型为 INif (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.// 统计所有资源指标数据的 ClusterNode 也记录当前请求被拒绝Constants.ENTRY_NODE.increaseBlockQps(count);}// Handle block event with registered entry callback handlers. 处理回调for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onBlocked(e, context, resourceWrapper, node, count, args);}throw e;}
其它异常
其它异常并非指业务异常,因为此时业务代码还未执行,而业务代码抛出的异常是通过调用 Tracer#trace 方法记录的。此时不再统计异常。
} catch (Throwable e) {// Unexpected internal error, set error to current entry.context.getCurEntry().setError(e);throw e;}
可以看到不同的异常有不同的统计方式。一般限流导致的异常是BlockException 。
exit 方法
exit 方法被调用时,要么请求被拒绝,要么请求被放行并且已经执行完成,所以 exit 方法需要知道当前请求是否正常执行完成,这正是 StatisticSlot 在捕获异常时将异常记录到当前 Entry 的原因,exit 方法中通过 Context 可获取到当前 CtEntry,从当前 CtEntry 可获取 entry 方法中写入的异常
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {Node node = context.getCurNode();if (context.getCurEntry().getBlockError() == null) {// Calculate response time (use completeStatTime as the time of completion).long completeStatTime = TimeUtil.currentTimeMillis();context.getCurEntry().setCompleteTimestamp(completeStatTime);long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();Throwable error = context.getCurEntry().getError();// Record response time and success count. 记录执行耗时与成功总数recordCompleteFor(node, count, rt, error);recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error);if (resourceWrapper.getEntryType() == EntryType.IN) { // 流量类型为 in 时recordCompleteFor(Constants.ENTRY_NODE, count, rt, error);}}// Handle exit event with registered exit callback handlers. 回调Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();for (ProcessorSlotExitCallback handler : exitCallbacks) {handler.onExit(context, resourceWrapper, count, args);}// fix bug https://github.com/alibaba/Sentinel/issues/2374fireExit(context, resourceWrapper, count, args);}private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) {if (node == null) {return;}node.addRtAndSuccess(rt, batchCount);node.decreaseThreadNum();// 自减当前资源占用的线程数if (error != null && !(error instanceof BlockException)) {node.increaseExceptionQps(batchCount);}}
entry没抛出异常,会记录下rt,也就是业务代码执行的响应时间,然后将资源对应的并发线程数-1操作。
资源指标数据的记录过程
ClusterNode 才是一个资源全局的指标数据统计节点,但是上面的StatisticSlot#entry 方法与 exit 方法中看到。因为 ClusterNode 被 ClusterBuilderSlot 交给了 DefaultNode 掌管,在 DefaultNode 的相关指标数据收集方法被调用时,ClusterNode 的对应方法也会被调用。
public class DefaultNode extends StatisticNode {/*** Associated cluster node.*/private ClusterNode clusterNode;...@Overridepublic void increaseExceptionQps(int count) {super.increaseExceptionQps(count);this.clusterNode.increaseExceptionQps(count);}@Overridepublic void addRtAndSuccess(long rt, int successCount) {super.addRtAndSuccess(rt, successCount);this.clusterNode.addRtAndSuccess(rt, successCount);}@Overridepublic void increaseThreadNum() {super.increaseThreadNum();this.clusterNode.increaseThreadNum();}@Overridepublic void decreaseThreadNum() {super.decreaseThreadNum();this.clusterNode.decreaseThreadNum();}@Overridepublic void addPassRequest(int count) {super.addPassRequest(count);this.clusterNode.addPassRequest(count);}
假设当前请求被成功处理,StatisticSlot 会调用 DefaultNode#addRtAndSuccess 方法记录请求处理成功、并且记录处理请求的耗时,DefaultNode 先调用父类StatisticNode的 addRtAndSuccess 方法,然后 DefaultNode 会调用 ClusterNode#addRtAndSuccess 方法.ClusterNode 与 DefaultNode 都是 StatisticNode 的子类
StatisticNode 中rollingCounterInSecond 是一个秒级的滑动窗口,rollingCounterInMinute 是一个分钟级的滑动窗口,类型为 ArrayMetric,com.alibaba.csp.sentinel.node.StatisticNode#addRtAndSuccess
public void addRtAndSuccess(long rt, int successCount) {rollingCounterInSecond.addSuccess(successCount);//秒级rollingCounterInSecond.addRT(rt);rollingCounterInMinute.addSuccess(successCount);//分钟级rollingCounterInMinute.addRT(rt);}
com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric#addSuccess
public void addSuccess(int count) {WindowWrap<MetricBucket> wrap = data.currentWindow();wrap.value().addSuccess(count);}
com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket#addSuccess
public void addSuccess(int n) {add(MetricEvent.SUCCESS, n);}
MetricBucket 使用 LongAdder 记录各项指标数据的值,在 MetricEvent 枚举类中定义了 Sentinel 会收集哪些指标数据,
public enum MetricEvent {/*** Normal pass. 请求被放行*/PASS,/*** Normal block. 请求被拒绝*/BLOCK,EXCEPTION, //异常SUCCESS, //处理成功RT, //耗时/*** Passed in future quota (pre-occupied, since 1.5.0).* 预通过总数*/OCCUPIED_PASS
}
数据统计小结:
首先 NodeSelectorSlot 为资源创建 DefaultNode,将 DefaultNode 向下传递,ClusterBuilderSlot 负责给资源的 DefaultNode 加工,添加 ClusterNode 这个零部件,再将 DefaultNode 向下传递给 StatisticSlot。
- 一个调用链路上只会创建一个 Context,在调用链路的入口创建(一个调用链路上第一个被 Sentinel 保护的资源)。
- 一个 Context 名称只创建一个 EntranceNode,也是在调用链路的入口创建,调用 Context#enter 方法时创建。
- 与方法调用的入栈出栈一样,一个线程上调用多少次 SphU#entry 方法就会创建多少个 CtEntry。
- 一个调用链路上,如果多次调用 SphU#entry 方法传入的资源名称都相同,那么只会创建一个 DefaultNode,如果资源名称不同,会为每个资源名称创建一个 DefaultNode,当前 DefaultNode 会作为调用链路上的前一个 DefaultNode 的子节点。
- 一个资源有且只有一个 ProcessorSlotChain,一个资源有且只有一个 ClusterNode。
- 一个 ClusterNode 负责统计一个资源的全局指标数据。
- StatisticSlot 负责记录请求是否被放行、请求是否被拒绝、请求是否处理异常、处理请求的耗时等指标数据,在 StatisticSlot 调用 DefaultNode 用于记录某项指标数据的方法时,DefaultNode 也会调用 ClusterNode 的相对应方法,完成两份指标数据的收集。
- DefaultNode 统计当前资源的各项指标数据的维度是同一个 Context(名称相同),而 ClusterNode 统计当前资源各项指标数据的维度是全局。