Sentinel源码解析——FlowSlot流控规则
- StatisticNode与StatisticSlot
- StatisticNode内部结构
- StatisticSlot
- FlowSlot流控规则
在前面的文章,我们对Sentinel的原理进行了分析,Sentinel底层使用了责任链模式,这个责任链就是ProcessorSlotChain对象,链中的每个节点都是一个ProcessorSlot,每个ProcessorSlot对应一个规则的处理。
然后我们又对Sentinel的整体流程进行了源码分析,我们分析了Sentinel的ProcessorSlotChain对象默认的构成:
但是我们没有对每个slot进行深入的分析,本篇文章就对它们进行深入的了解。
StatisticNode与StatisticSlot
FlowSlot是根据StatisticSlot中的统计数据进行流控规则校验的,而StatisticSlot的统计数据又是维护在StatisticNode对象中,呈现以下关系:
StatisticNode内部结构
public class StatisticNode implements Node {// 统计1秒内数据的滑动时间窗,1秒内有两个时间窗格,每个时间窗格的时间跨度是500msprivate transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,IntervalProperty.INTERVAL);// 统计1分钟内数据的滑动时间窗,1分钟内有60个时间窗格,每个时间窗格的时间跨度时1sprivate transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);/*** 当前并发线程数计数器*/private LongAdder curThreadNum = new LongAdder();...}
StatisticNode有三个成员变量,其中rollingCounterInSecond和rollingCounterInMinute都是滑动时间窗计数器,用于分别统计1秒内和1分钟内的数据。
而curThreadNum则是一个LongAdder类型的并发线程数计数器。
rollingCounterInSecond和rollingCounterInMinute都是Metric类型,Metric是一个接口,实现类是ArrayMetric。
public class ArrayMetric implements Metric {private final LeapArray<MetricBucket> data;...}
ArrayMetric里面是一个LeapArray<MetricBucket>。
public abstract class LeapArray<T> {protected int windowLengthInMs;protected int sampleCount;protected int intervalInMs;private double intervalInSecond;protected final AtomicReferenceArray<WindowWrap<T>> array;public LeapArray(int sampleCount, int intervalInMs) {...// 单个时间窗口长度 500msthis.windowLengthInMs = intervalInMs / sampleCount;// 以毫秒为单位的计数器统计的时间跨度 1000msthis.intervalInMs = intervalInMs;// 以秒为单位的计数器统计的时间跨度 1sthis.intervalInSecond = intervalInMs / 1000.0;// 时间窗口个数 2个this.sampleCount = sampleCount;// 时间窗口数组this.array = new AtomicReferenceArray<>(sampleCount);}}
LeapArray中的成员属性都是根据构造器参数sampleCount(时间窗口个数)和intervalInMs(以毫秒为单位的计数器统计的时间跨度)计算得出的。
其中sampleCount和intervalInMs直接作为LeapArray的成员属性保存。
windowLengthInMs单个时间窗口长度,自然是intervalInMs除以sampleCount获得。rollingCounterInSecond的intervalInMs是1000,sampleCount是2,因此windowLengthInMs为500,表示500ms一个时间窗口。
intervalInSecond是以秒为单位的计数器统计的时间跨度,因此是intervalInMs除以1000,这里的intervalInSecond就是1,代表当前计数器统计的时间范围是1秒。
array是一个AtomicReferenceArray,他就是时间窗口数组,每个窗口是一个WindowWrap对象,泛型是MetricBucket,sampleCount就是AtomicReferenceArray的数组长度,因此是两个时间窗口。
WindowWrap里面的这个MetricBucket就是真正记录统计数值的。
public class MetricBucket {// 记录统计数组的数组,每个下标对应一个指标的统计值private final LongAdder[] counters;public MetricBucket() {MetricEvent[] events = MetricEvent.values();this.counters = new LongAdder[events.length];for (MetricEvent event : events) {counters[event.ordinal()] = new LongAdder();}}...
}
MetricBucket里面还有结构,并不是一个单一的value,而是一个LongAdder[],数组中的每个LongAdder对应一个指标的统计,具体有哪些指标,可以查看MetricEvent中的枚举。
public enum MetricEvent {// 规则校验通过数PASS,// 规则校验失败数BLOCK,// 异常数EXCEPTION,// 成功数SUCCESS,// 所有成功调用的响应时间RT,OCCUPIED_PASS
}
StatisticSlot
@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {fireEntry(context, resourceWrapper, node, count, prioritized, args);// 增加当前线程数node.increaseThreadNum();// 增加规则校验通过数node.addPassRequest(count);...} catch (PriorityWaitException ex) {...} catch (BlockException e) {...// 增加流控规则校验失败计数node.increaseBlockQps(count);...} catch (Throwable e) {// 设置异常到contextcontext.getCurEntry().setError(e);throw e;}}
StatisticSlot的entry方法与其他的slot处理流程不大一样,它是先调用fireEntry方法让slot链继续往后执行。然后后面才进行相关指标的统计。
@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {Node node = context.getCurNode();if (context.getCurEntry().getBlockError() == null) {// 计算响应时间rtlong completeStatTime = TimeUtil.currentTimeMillis();context.getCurEntry().setCompleteTimestamp(completeStatTime);long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();// 从context中取出error,(如果抛异常,上面的entry方法会设置到context中)Throwable error = context.getCurEntry().getError();recordCompleteFor(node, count, rt, error);// ...}// ...fireExit(context, resourceWrapper, count, args);}private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) {...// 增加响应时间和成功数node.addRtAndSuccess(rt, batchCount);// 减去当前线程数node.decreaseThreadNum();if (error != null && !(error instanceof BlockException)) {// 如果有异常,增加异常数node.increaseExceptionQps(batchCount);}}
StatisticSlot的exit方法最后做的就是增加响应时间和成功数以及减去当前线程数;如果context中有异常(就是entry方法塞进去的)还会增加异常数。
我们发现StatisticSlot做的这些指标统计,全是调用node对象的方法,这个node对象就是StatisticNode。
node.increaseThreadNum()增加并发线程数:
StatisticNode#increaseThreadNum()
@Overridepublic void increaseThreadNum() {curThreadNum.increment();}
并发线程数是直接加到StatisticNode中的curThreadNum变量中。
而其他的指标都是加到滑动时间窗计数器里面,我们挑一个增加规则校验通过数的node.addPassRequest(count)来看。
StatisticNode#addPassRequest():
@Overridepublic void addPassRequest(int count) {rollingCounterInSecond.addPass(count);rollingCounterInMinute.addPass(count);}
两个滑动时间窗计数器都增加。
ArrayMetric#addPass(int)
@Overridepublic void addPass(int count) {// 根据当前时间戳定位对应的时间窗口WindowWrap<MetricBucket> wrap = data.currentWindow();// 时间窗口中的paas计数加1wrap.value().addPass(count);}
先是根据当前时间戳定位对应的时间窗口,然后把时间窗口中的pass计数加1。
看下是如何根据当前时间戳定位对应的时间窗口的:
LeapArray#currentWindow()
public WindowWrap<T> currentWindow() {return currentWindow(TimeUtil.currentTimeMillis());}public WindowWrap<T> currentWindow(long timeMillis) {// 根据当前时间戳计算时间窗数组下标int idx = calculateTimeIdx(timeMillis);// 计算窗口开始时间windowStartlong windowStart = calculateWindowStart(timeMillis);while (true) {// 根据下标取得时间窗WindowWrap<T> old = array.get(idx);if (old == null) {// 定位到的时间窗口为空,创建,窗口开始时间就是windowStartWindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));...} else if (windowStart == old.windowStart()) {// 时间窗口的开始时间等于windowStart,表示这个时间窗没过期,返回该时间窗return old;} else if (windowStart > old.windowStart()) {// 时间窗口的开始时间小于windowStart,表示这个时间窗已过期,重置里面的计数,然后再返回这个时间窗口return resetWindowTo(old, windowStart);} ...}}
首先根据当前时间戳计算时间窗数组下标idx,通过下标就可以取得对应的时间窗old = array.get(idx)。
除此以外,还会根据当前时间戳计算一个窗口开始时间windowStart,然后每个时间窗创建的时候都会记录一个开始时间old.windowStart(),两个开始时间一比较,就可得知当前时间窗是否已过期。如果old.windowStart()小于windowStart,那么表示时间窗口old已经过期了。
根据当前时间戳计算目标窗口下标:
LeapArray#calculateTimeIdx(long)
private int calculateTimeIdx(long timeMillis) {// 当前时间戳除以当个窗口的时间跨度(500ms),得到timeIdlong timeId = timeMillis / windowLengthInMs;// timeId对时间窗数组长度取模,得到下标idxreturn (int)(timeId % array.length());}
根据当前时间戳计算窗口开始时间windowStart:
LeapArray#calculateWindowStart(long)
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {// 当前时间戳 - 当前时间戳 % 当个时间窗的时间跨度return timeMillis - timeMillis % windowLengthInMs;}
得到时间窗后,就是执行“wrap.value().addPass(count);”这行代码,首先时间窗WindowWrap的泛型是MetricBucket,因此wrap.value()取到的是MetricBucket对象,然后调用MetricBucket的addPass(count)方法。
MetricBucket#addPass(int)
public void addPass(int n) {add(MetricEvent.PASS, n);}public MetricBucket add(MetricEvent event, long n) {// counters是个LongAdder[]// PASS对应枚举值0// 因此这里就是对MetricBucket中的LongAdder数组counters中下标为0的LongAdder增加ncounters[event.ordinal()].add(n);return this;}
我们上面已经说过MetricBucket中是一个LongAdder[]记录不同指标的计数。而Event.PASS对应的枚举值是0,因此这里就是对LongAdder数组counters中下标为0的LongAdder增加n,正好对应的就是paas指标的LongAdder。
FlowSlot流控规则
FlowSlot的流控规则校验逻辑全在entry方法中,而exit直接调用fireExit方法往下走,因此我们只看FlowSlot的entry方法即可。
FlowSlot#entry(…)
@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {// 流控规则校验checkFlow(resourceWrapper, context, node, count, prioritized);fireEntry(...);}
FlowSlot#checkFlow(…)
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)throws BlockException {// 流控规则校验checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);}
FlowRuleChecker#checkFlow(…)
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {...// 从ruleProvider中根据资源名称取得对应的流控规则集合Collection<FlowRule> rules = ruleProvider.apply(resource.getName());if (rules != null) {for (FlowRule rule : rules) {// 逐个校验每个流控规则,一旦有一个校验不通过,则抛出FlowExceptionif (!canPassCheck(rule, context, node, count, prioritized)) {throw new FlowException(rule.getLimitApp(), rule);}}}}
沿着FlowSlot的entry方法一路进来,到FlowRuleChecker的checkFlow方法。主体流程就是先根据资源名称取到对应的流控规则集合,然后再遍历这个流控规则集合,逐一校验每个流程规则,如果有哪一个规则校验没有通过,那么就抛出FlowException。
FlowRuleChecker#canPassCheck(FlowRule, …)
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {...// 单个流控规则校验return passLocalCheck(rule, context, node, acquireCount, prioritized);}private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {...// 单个流控规则校验return rule.getRater().canPass(selectedNode, acquireCount, prioritized);}
“return rule.getRater().canPass(selectedNode, acquireCount, prioritized);”这一行代码有可能进入不同的实现类,视我们选择的“流控效果”而定。
- DefaultController#canPass(Node, int, boolean):快速失败(滑动时间窗算法)
- WarmUpController#canPass(Node, int, boolean):Warm Up(令牌桶算法)
- RateLimiterController#canPass(Node, int, boolean):排队等待(漏桶算法)
DefaultController#canPass(Node, int, boolean):
@Overridepublic boolean canPass(Node node, int acquireCount, boolean prioritized) {// 当前的QPS(或者并发线程数,看我们选的“阈值类型”是什么)int curCount = avgUsedTokens(node);// 加上要增加的数目,如果超了,返回false表示校验失败,没超则返回true表示校验通过if (curCount + acquireCount > count) {...return false;}return true;}
DefaultController#canPass(Node, int, boolean)方法是流控效果为“快速失败”对应的流控规则校验类,使用的时滑动时间窗算法。首先计算获取当前的QPS或者并发线程数,这个视乎我们选的“阈值类型”而定,然后加上当前要申请的数目acquireCount,如果超过了阈值,返回false表示校验失败,没超则返回true表示校验通过。
DefaultController#avgUsedTokens(Node):
private int avgUsedTokens(Node node) {...// node.curThreadNum()取得的时并发线程数// node.passQps()取得的是QPSreturn grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());}
node.curThreadNum()其实就是取的StatisticNode中curThreadNum属性的值,它是一个LongAdder类型,上面已经介绍过。
、StatisticNode#curThreadNum()
@Overridepublic int curThreadNum() {return (int)curThreadNum.sum();}
node.passQps()则要进行计算。
StatisticNode#passQps()
@Overridepublic double passQps() {return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();}
rollingCounterInSecond.pass()取得的是所有的时间窗口pass计数的汇总:
ArrayMetric#pass()
@Overridepublic long pass() {// 这里是判断如果当前的时间窗口过期,则重置它data.currentWindow();// 所有的时间窗口的paas计数加总到passlong pass = 0;List<MetricBucket> list = data.values();for (MetricBucket window : list) {pass += window.pass();}return pass;}
window.pass()则是取得单个MetricBucket中的pass计数。
MetricBucket#pass():
public long pass() {return get(MetricEvent.PASS);}public long get(MetricEvent event) {return counters[event.ordinal()].sum();}
这样就取得了计数器中每个时间窗口的pass计数加总后的值,但这个并不是QPS,因为有可能这个计数器的时间跨度是大于1秒的,因此还要除以rollingCounterInSecond.getWindowIntervalInSec()。
rollingCounterInSecond.getWindowIntervalInSec()取得的是整个计数器的时间跨度(以秒为单位):
ArrayMetric#getWindowIntervalInSec()
@Overridepublic double getWindowIntervalInSec() {return data.getIntervalInSecond();}
data是LeapArray类型,进入LeapArray的getIntervalInSecond方法:
LeapArray#getIntervalInSecond()
public double getIntervalInSecond() {return intervalInSecond;}
返回的是LeapArray中的intervalInSecond,也就是计数器统计的时间跨度(以秒为单位),那么“rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec()”得到的就是平均每秒的通过数(pass),也就是QPS。
以下是FlowSlot流控规则校验的整体流程,结合StatisticNode内部结构的那张大图看,思路就非常清晰了。