Sentinel学习笔记:sentinel-core

有的没的

有蛮长时间没更新了,笔记一直有在做,不过暂时都放在本地了,因为有些还不太完整(选定的每篇范围太大也是一个原因)。加上最近在实习,大部分笔记放在内部的笔记软件中。不过因为Sentinel有很大一部分是开源的,所以将笔记中有关开源的分析部分重新整理出来公开了,这段时间应该陆续会有笔记被放到这边。 文中没标注的部分资料(图片)来源于文末参考链接。

经典限流算法

计数器算法

Sentinel 中默认实现的 QPS 限流算法和 THREADS 限流算法都属于计数器算法。QPS 限流的默认算法是通过判断当前时间窗口(1 秒)的 pass(被放行的请求数量)指标数据判断,如果 pass 总数已经大于等于限流的 QPS 阈值,则直接拒绝当前请求,每通过一个请求当前时间窗口的 pass 指标计数加 1。THREADS 限流的实现是通过判断当前资源并行占用的线程数是否已经达到阈值,是则直接拒绝当前请求,每通过一个请求 THREADS 计数加 1,每完成一个请求 THREADS 计数减 1。

漏桶算法(Leaky Bucket)

漏桶就像在一个桶的底部开一个洞,不控制水放入桶的速度,而通过底部漏洞的大小控制水流失的速度,当水放入桶的速率小于或等于水通过底部漏洞流出的速率时,桶中没有剩余的水,而当水放入桶的速率大于漏洞流出的速率时,水就会逐渐在桶中积累,当桶装满水时,若再向桶中放入水,则放入的水就会溢出。我们把水换成请求,往桶里放入请求的速率就是接收请求的速率,而水流失就是请求通过,水溢出就是请求被拒绝。

令牌桶算法(Token Bucket)

令牌桶不存放请求,而是存放为请求生成的令牌(Token),只有拿到令牌的请求才能通过。原理就是以固定速率往桶里放入令牌,每当有请求过来时,都尝试从桶中获取令牌,如果能拿到令牌请求就能通过。当桶放满令牌时,多余的令牌就会被丢弃,而当桶中的令牌被用完时,请求拿不到令牌就无法通过。

数据模型

核心类

ArrayMetric

Metric的实现类,数据节点的addRtAndSuccess最后会落到该类上,该类也是sentinel记录数据模型的最外层包装

内部持有一个LeapArray<MetricBucket> data数据结构作为对窗口的包装,所有操作都会落到该数据结构上

构造方法通过传入参数的差异,为data赋值不同子类的引用(秒级窗口OccupiableBucketLeapArray,分钟级窗口BucketLeapArray)

LeapArray

内部持有一个AtomicReferenceArray<WindowWrap<T>>的数据结构作为实际容纳窗口的容器,各种操作会在该类中获取对应的WindowWrap<T>再进行相关操作

成员变量

protected int windowLengthInMs;

每个窗口长度(Ms)

protected int sampleCount;

滑动窗口内样本个数

protected int intervalInMs;

LeapArray总时间(Ms)

private double intervalInSecond;

LeapArray总时间(s)

protected final AtomicReferenceArray<WindowWrap> array;

private final ReentrantLock updateLock = new ReentrantLock();

更新WindowWrap<T>的锁

WindowWrap

窗口包装类,主要用于包装MetricBucket,记录窗口元信息

成员变量

private final long windowLengthInMs;

窗口长度

private long windowStart;

窗口开始时间

private T value;

被包装的类

MetricBucket

实际数据存放类

维护了一个LongAdder数组counters,用于记录各种数据,包括以下几种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public enum MetricEvent {
/**
* Normal pass.
*/
PASS,
/**
* Normal block.
*/
BLOCK,
EXCEPTION,
SUCCESS,
RT,
/**
* Passed in future quota (pre-occupied, since 1.5.0).
*/
OCCUPIED_PASS
}

核心方法

获取数据(以success为例)

com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric#success

获取窗口内的success个数

1
2
3
4
5
6
7
8
9
10
11
12
13
public long success() {
//方法本身是用来获取当前时间所在窗口,在这边被用于更新当前时间所在窗口
data.currentWindow();
long success = 0;

//获取整个窗口内的所有MetricBucket
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
//累加计算总和
success += window.success();
}
return success;
}

com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#currentWindow(long)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}

int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);

/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}

注释非常详细,需要注意的是resetWindowTo方法,不同子类有不同的实现

  • BucketLeapArray:重置窗口开始时间,将各种数据置为0
  • OccupiableBucketLeapArray:重置窗口开始时间,将borrowArray中对应时间的MetricBucket的数据作为窗口的数据,而不是直接归零(具体作用在流控中解释)

com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#values()

com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#values(long)

获取窗口内的所有有效的(非过期)MetricBucket,组成list返回

增加数据(以addSuccess为例)

com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric#addSuccess

调用currentWindow获取当前窗口后调用value()获取MetricBucket再调用addSuccess(count)方法

滑动窗口类型

分钟级滑动窗口

窗口总长度60秒,每个样本窗口1秒,总共60个样本窗口

private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

秒级滑动窗口

窗口总长度1秒,每个样本窗口500ms,总共2个样本窗口

private transient volatile Metric rollingCounterInSecond =
new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,IntervalProperty.INTERVAL);

PS:秒级窗口不精确,监控中的秒级数据来自于分钟级窗口,只有需要最近1秒的数据时才会使用秒级滑动窗口的值(详情可以看文首第二个链接)

切入流程

(以SpringBoot版本为例)

通过AOP机制织入

com.alibaba.csp.sentinel.annotation.aspectj.SentinelResourceAspect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {

@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {
}

@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {

//获取方法
Method originMethod = resolveMethod(pjp);

//获取注解
SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
if (annotation == null) {
// Should not go through here.
throw new IllegalStateException("Wrong state for SentinelResource annotation");
}

//获取资源名称
String resourceName = getResourceName(annotation.value(), originMethod);
//获取类型(OUT or IN)
EntryType entryType = annotation.entryType();
Entry entry = null;
try {
//核心部分,入口
entry = SphU.entry(resourceName, entryType, 1, pjp.getArgs());
//通过,允许调用
Object result = pjp.proceed();
return result;
} catch (BlockException ex) {
//不通过,不允许调用
return handleBlockException(pjp, annotation, ex);
} catch (Throwable ex) {
Tracer.trace(ex);
throw ex;
} finally {
if (entry != null) {
//退出入口
entry.exit();
}
}
}
}

Entry

LearningGp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {

//获取ThreadLocal中已经存在的context
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
return new CtEntry(resourceWrapper, null, context);
}

if (context == null) {
// Using default context.
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}

// Global switch is close, no rule checking will do.
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}

//通过SPI机制加载处理链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

/*
* Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
* so no rule checking will be done.
*/
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}

//构造Entry实例
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
/进入处理链
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}

之后在处理链指针不为空的情况下按序调用进入方法并返回

核心类

Context

Context 代表调用链路上下文,贯穿一次调用链路中的所有 Entry。Context 维持着入口节点(entranceNode)、本次调用链路的 curNode、调用来源(origin)等信息。Context 名称即为调用链路入口名称。

Context 通过 ThreadLocal 传递,只在调用链路的入口处创建。

Context 的字段说明:

  • name:Context 的名称。
  • entranceNode:当前调用树的入口节点,类型为 EntranceNode。同一个入口的资源,每个资源对应一个 DefaultNode,entranceNode#childList 用于存储这些资源的 DefaultNode。
  • curEntry:当前 Entry(CtEntry)。
  • origin:调用来源的名称,即服务消费者的名称或者服务消费者的来源 IP,取决于服务消费者是否使用 Sentinel,由 Sentinel 适配层传递过来。例如:服务提供者是 Spring MVC 应用,且服务提供者使用 Sentinel 的 Web MVC 适配,那么 Sentinel 会尝试从请求头获取"S-user",如果服务消费者有在请求头传递这个参数,那么就能够获取到

Entry

在调用 Context#getCurNode 方法获取调用链路上当前访问到的资源的 DefaultNode 时,实际是从 Context#curEntry 获取的,Entry 维护了当前资源的 DefaultNode,以及调用来源的 StatisticNode。

CtEntry 是 Entry 的直接子类

CtEntry 用于维护父子 Entry,每一次调用 SphU#entry 方法都会创建一个 CtEntry。如果服务 B 在处理一个请求的路径上会多次调用 SphU#entry,那么这些 CtEntry 会构成一个双向链表。在每次创建 CtEntry,都会将 Context.curEntry 设置为这个新的 CtEntry,双向链表的作用就是在调用 CtEntry#exit 方法时,能够将 Context.curEntry 还原为上一个资源的 CtEntry。

Node

用于持有实时统计的指标数据

来源见参考链接

Node实际上是数据节点,主要用于统计各种规则所需要的数据,如QPS,线程数等等。各个Node因为记录数据的不同有以下四个实现

StatisticNode

数据统计节点,核心数据是以下三个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Holds statistics of the recent {@code INTERVAL} seconds. The {@code INTERVAL} is divided into time spans
* by given {@code sampleCount}.
* 秒级滑动窗口,用于统计实时的 QPS,2 个时间窗口大小为 500 毫秒的 Bucket
*/
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);

/**
* Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
* meaning each bucket per second, in this way we can get accurate statistics of each second.
* 分级滑动窗口,用于保存最近一分钟内的历史指标数据,60 个 Bucket 数组,每个 Bucket 统计的时间窗口大小为 1 秒
*/
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

/**
* The counter for thread count.
* 并发线程数
*/
private LongAdder curThreadNum = new LongAdder();

滑动窗口使用场景

  • 获取前一秒被 Sentinel 拒绝的请求总数从分钟级滑动窗口获取
  • 获取当前一秒内已经被 Sentinel 拒绝的请求总数则从秒级滑动窗口获取
  • 获取最小耗时也是从秒级的滑动窗口获取

StatisticNode 还负责统计并行占用的线程数,用于实现信号量隔离,按资源所能并发占用的最大线程数实现限流。通过控制并发线程数实现信号量隔离的好处就是不让一个接口同时使用完线程池所有线程

数据统计部分采用改进的滑动窗口的方式,时间窗口+Bucket,通过循环复用 Bucket 以减少 Bucket 的创建和销毁。在统计指标数据时,利用当前时间戳定位 Bucket,使用 LongAdder 统计时间窗口内的请求成功数、失败数、总耗时等指标数据优化了并发锁。Sentinel 通过定时任务递增时间戳以获取当前时间戳,避免了每次获取时间戳都使用 System 获取的性能消耗。//todo

DefaultNode

resource * context纬度数据统计节点,存在每个 NodeSelectorSlot 的map里

DefaultNode 字段说明:

  • id:资源 ID,ResourceWrapper 对象。
  • childList:childList 是一个 Node(DefaultNode)集合,用于存放子节点。
  • clusterNode:clusterNode 字段是一个 ClusterNode,ClusterNode 也是 StatisticNode 的子类。

EntranceNode

入口节点,名称相同的 Context 都使用同一个 EntranceNode

来源见参考链接

ClusterNode

统计每个资源全局的指标数据,以及统计该资源按调用来源区分的指标数据。全局数据指的是不区分调用链路,一个资源 ID 只对应一个 ClusterNode。

ClusterNode 字段说明:

  • name:资源名称
  • resourceType:资源类型
  • originCountMap:维护每个调用来源的指标数据统计数据(StatisticNode)

Root与调用树

Constants 常量类用于声明全局静态常量,Constants 有一个 ROOT 静态字段,类型为 EntranceNode。

在调用 ContextUtil#enter 方法时,如果还没有为当前入口创建 EntranceNode,则会为当前入口创建 EntranceNode,将其赋值给 Context.entranceNode,同时也会将这个 EntranceNode 添加到 Constants.ROOT 的子节点(childList)。资源对应的 DefaultNode 则是在 NodeSelectorSlot 中创建,并赋值给 Context.curEntry.curNode。

Constants.ROOT、Context.entranceNode 与 Entry.curNode 三者关系如下图所示。

来源见参考链接

Slot

处理链上的插槽,核心逻辑所在

整体处理基于责任链模式,通过spi机制按顺序加载处理链,所有ProcessorSlot都继承AbstractLinkedProcessorSlot 类,从而组成单向链表,调用 fireEntry 方法逐个前进或者 fireExit 方法逐个后退

主要分成两类,一类数据统计(NodeSelectorSlot->ClusterBuilderSlot->StatisticSlot(logslot辅助用,暂时放在这类)),一类实现降级以及block

数据统计Slot

NodeSelectorSlot

这个 slot 主要负责收集资源的路径,并将这些资源的调用路径以树状结构存储起来,用于根据调用路径进行流量控制。

  • 为当前资源创建 DefaultNode,并且将 DefaultNode 赋值给 Context.curEntry.curNode
  • 如果当前调用链路上只出现过一次 SphU#entry 的情况,将该 DefaultNode 添加到的 Context.entranceNode 的子节点,否则添加到 Context.curEntry.parent 的子节点(通过com.alibaba.csp.sentinel.context.Context#getLastNode实现)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void firstLevel(){
ContextUtil.enter("level-test","application");
try (Entry entry = SphU.entry("firstLevel")){
System.out.println("firstLevel");
secondLevel();
} catch (BlockException e) {
throw new RuntimeException("firstLevel");
}
}

private void secondLevel(){
ContextUtil.enter("level-test","application");
try (Entry entry = SphU.entry("secondLevel")){
System.out.println("secondLevel");
} catch (BlockException e) {
throw new RuntimeException("secondLevel");
}
}

//此时调用firstLevel方法会形成如下调用树

ROOT (machine-root)
/
EntranceNode (context name: level-test)
/
DefaultNode (resource name:firstLevel)
/
DefaultNode (resource name:firstLevel)

ClusterBuilderSlot

这一环的主要作用是构建 ClusterNode,为资源纬度的统计节点

  • 如果当前资源未创建 ClusterNode,则为资源创建 ClusterNode;
  • 将 ClusterNode 赋值给当前资源的 DefaultNode.clusterNode;
  • 如果调用来源(origin)不为空,则为调用来源创建 StatisticNode,用于实现按调用来源统计资源的指标数据,ClusterNode 持有每个调用来源的 StatisticNode。

ps:ClusterBuilderSlot持有非静态private volatile ClusterNode clusterNode = null;,因为一个资源只会创建一个 ProcessorSlotChain,意味着 ClusterBuilderSlot 也只会创建一个,那么让 ClusterBuilderSlot 持有该资源的 ClusterNode 就可以省去每次都从 Map 中获取的步骤

LogSlot

进入时直接fire,在后续节点抛出BlockException后在这里将相关日志记录

StatisticSlot

负责进行数据统计,也是先fire,在后续节点处理完之后,在这里对统计的数据进行记录

entry 方法
请求正常通过
  • 当前资源并行占用的线程数增加 1、当前时间窗口被放行的请求总数加 1
  • 如果调用来源不为空,也将调用来源的 StatisticNode 的当前并行占用线程数加 1、当前时间窗口被放行的请求数加 1
  • 如果流量类型为 IN,则将资源全局唯一的 ClusterNode 的并行占用线程数、当前时间窗口被放行的请求数都增加 1
  • 回调所有 ProcessorSlotEntryCallback#onPass 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
node.increaseThreadNum();
node.addPassRequest(count);

if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}

if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}

// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
捕获PriorityWaitException异常

(特殊情况)

当捕获到 PriorityWaitException 异常时,说明当前请求已经被休眠了一会了,但请求还是允许通过的

  • 不需要为 DefaultNode 记录这个请求的指标数据
  • 自增当前资源并行占用的线程数
  • 为 ClusterNode 自增并行占用的线程数
  • 回调所有 ProcessorSlotEntryCallback#onPass 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}

if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
捕获到 BlockException 异常
  • 将异常记录到调用链路上下文的当前 Entry(StatisticSlot 的 exit 方法会用到)
  • 调用 DefaultNode#increaseBlockQps 方法记录当前请求被拒绝
  • 将当前时间窗口的 block qps 这项指标数据的值加 1
  • 如果调用来源不为空,让调用来源的 StatisticsNode 也记录当前请求被拒绝
  • 如果流量类型为 IN,则让用于统计所有资源指标数据的 ClusterNode 也记录当前请求被拒绝
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);

// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}

if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}

// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}

throw e;
捕获到其他异常
  • 让 DefaultNode 记录当前请求异常
1
2
3
4
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);

throw e;
exit 方法

由于StatisticSlot 在捕获异常时将异常记录到当前 Entry,exit 方法中通过 Context 可获取到当前 CtEntry,从当前 CtEntry 可获取 entry 方法中写入的异常,从而得知请求的具体状况,完成相应操作

  • 计算耗时
  • 记录执行耗时与成功总数
  • 自减当前资源占用线程数
  • 来源不为空,减少来源的线程数
  • 流量为In,让用于统计所有资源指标数据的 ClusterNode 也记录相关信息
  • 调用回调方法

PS:在 DefaultNode 的相关指标数据收集方法被调用时,ClusterNode 的对应方法也会被调用

限流降级以及流控Slot

AuthoritySlot

权限控制,根据 origin 做黑白名单的控制

SystemSlot

实现自适应限流(针对全局入口流量)
com.alibaba.csp.sentinel.slots.system.SystemRuleManager#checkSystem

主要比较参数有successQps,curThreadNum,avgRt,highestSystemLoad,highestCpuUsage

通过起一个后台线程(SystemMetricCollectorTask),每秒查询一次系统负载和CPU使用负载

第一版基于TCP BBR算法(结合自动化控制理论优化中PID controller)

第二版将系统指标(load/CPU usage)作为一个启动条件

com.alibaba.csp.sentinel.slots.system.SystemRuleManager#checkSystem

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
if (resourceWrapper == null) {
return;
}
// Ensure the checking switch is on.
if (!checkSystemStatus.get()) {
return;
}

// for inbound traffic only
if (resourceWrapper.getEntryType() != EntryType.IN) {
return;
}

// total qps
double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
if (currentQps > qps) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}

// total thread
int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}

double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}

// load. BBR algorithm.
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}

// cpu usage
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}

FlowSlot

负责流控功能的实现

由 ProcessorSlot、Checker、Rule、RuleManager 组合完成,ProcessorSlot作为入口,并持有对应的Checker,Checker根据Rule进行检查,RuleManager管理Rule

主要步骤如下

  1. 在 ProcessorSlot#entry 方法中调用 Checker#check 方法,并将 DefaultNode 传递给 Checker。
  2. Checker 根据资源名称从 RuleManager 获取为该资源配置的规则。
  3. Checker 从 传入context,以及node中根据rule和策略获取需要的node
    1. 如果当前限流规则的 limitApp 不为 default,该限流规则只针对指定调用来源限流。当调用来源与当前限流规则的 limitApp 相等时:
      1. strategy 为 STRATEGY_DIRECT,则使用调用来源的 StatisticNode,实现针对调用来源限流。
      2. strategy 为 STRATEGY_RELATE:根据限流规则配置的 refResource 获取引用资源的 ClusterNode,即使用引用资源的指标数据限流。通俗点说就是使用其它资源的指标数据限流,你的并发量高我就限流,让你多处理一点请求,等你并发量降低了,我就不限流了;
      3. strategy 为 STRATEGY_CHAIN:使用当前资源的 DefauleNode,实现按调用链路的资源指标数据限流。
    2. 当 limitApp 为 default 时,针对所有来源限流。
      1. strategy 为 STRATEGY_DIRECT,则使用当前资源的 ClusterNode。
      2. strategy 为 STRATEGY_RELATE:使用引用资源的 ClusterNode;
      3. strategy 为 STRATEGY_CHAIN:使用当前资源的 DefauleNode。
    3. 如果 limitApp 为 other,且该资源的所有限流规则都没有针对当前的调用来源限流。
      1. 如果 strategy 为 STRATEGY_DIRECT,则使用 origin 的 StatisticNode。
    4. strategy 为 STRATEGY_RELATE:使用引用资源的 ClusterNode
    5. strategy 为 STRATEGY_CHAIN:使用当前资源的 DefauleNode
  4. 根据配置调用对应的controller的canPass方法
  5. 从node中获取当前时间窗口的某项指标数据(QPS、avgRt 等)与规则的阈值对比,如果达到规则的阈值则抛出 Block 异常。
TrafficShapingController

实现流量塑形的controller主要有以下几种

DefaultController

实现快速失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//如果当前规则的限流阈值类型为 QPS,则返回node当前时间窗口统计的QPS;
//如果当前规则的限流阈值类型为 THREADS,则返回node统计的当前并行占用的线程数。
int curCount = avgUsedTokens(node);
//判断加上acquireCount是否超过阈值,超过阈值则拒绝(prioritized除外)
if (curCount + acquireCount > count) {
//如果限流阈值类型为 QPS且为优先流量,可以占用之后的流量(并非直接执行,需要等待对应时间)
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
//计算占用之后流量后需要等待的时间(只允许占用1秒)
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
//判断是否超过等待最长时间
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
//允许占用
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);

// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}

public long tryOccupyNext(long currentTime, int acquireCount, double threshold) {
//最多可占用令牌数(1s产生的令牌)
double maxCount = threshold * IntervalProperty.INTERVAL / 1000;
//当前秒级窗口已经被占用令牌数
long currentBorrow = rollingCounterInSecond.waiting();
if (currentBorrow >= maxCount) {
return OccupyTimeoutProperty.getOccupyTimeout();
}

//样本窗口长度
int windowLength = IntervalProperty.INTERVAL / SampleCountProperty.SAMPLE_COUNT;
//currentTime - currentTime % windowLength 当前时间所在样本窗口开始时间
//当前窗口开始时间,也是要占用的第一个样本窗口的开始时间
long earliestTime = currentTime - currentTime % windowLength + windowLength - IntervalProperty.INTERVAL;

int idx = 0;
/*
* Note: here {@code currentPass} may be less than it really is NOW, because time difference
* since call rollingCounterInSecond.pass(). So in high concurrency, the following code may
* lead more tokens be borrowed.
*/
//当前秒级窗口通过的请求数
long currentPass = rollingCounterInSecond.pass();
while (earliestTime < currentTime) {
//计算等待时间,等待到当前窗口结束(因为一个样本窗口可以存在于多个未来窗口中,当idx为0时,当前样本窗口为当前未来窗口的最后一个样本窗口,为1时当前样本窗口为当前未来窗口的倒数第二个样本窗口)
/*
*|--1--|--2--|--3--|
*idx=0 当前样本窗口为3,等待到结束
*idx=1 当前样本窗口为2,同样等待到结束,比idx=1时多一个样本窗口时间长度
*/
long waitInMs = idx * windowLength + windowLength - currentTime % windowLength;
//超时退出
if (waitInMs >= OccupyTimeoutProperty.getOccupyTimeout()) {
break;
}
//目前所在未来窗口的通过数
long windowPass = rollingCounterInSecond.getWindowPass(earliestTime);
//当前窗口通过数+当前窗口被占用数+需要的令牌数-目前所在未来窗口的通过数(重复计算所以减去)<=最多可占用令牌数
if (currentPass + currentBorrow + acquireCount - windowPass <= maxCount) {
return waitInMs;
}
//占用窗口后移
earliestTime += windowLength;
currentPass -= windowPass;
idx++;
}

return OccupyTimeoutProperty.getOccupyTimeout();
}
RateLimiterController

实现排队等待

设置 QPS 为 10,那么每 100 毫秒允许通过一个,通过计算当前时间是否已经过了上一个请求的通过时间 latestPassedTime 之后的 100 毫秒,来判断是否可以通过。假设才过了 50ms,那么需要当前线程再 sleep 50ms,然后才可以通过。如果同时有另一个请求呢?那需要 sleep 150ms 才行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {
return false;
}

long currentTime = TimeUtil.currentTimeMillis();
// Calculate the interval between every two requests.
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

// Expected pass time of this request.
long expectedTime = costTime + latestPassedTime.get();

if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// Calculate the time to wait.
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
WarmUpController

实现Warm Up

Warm Up,冷启动。在应用升级重启时或长时间低压力之后,应用自身需要一个预热的过程,预热之后才能到达一个稳定的性能状态,比如说,接口预热阶段完成 JIT 即时编译、完成一些单例对象的创建、线程池的创建、各种连接池的初始化、或者执行首次需要加锁执行的代码块。核心算法借鉴了Guava中SmoothWarmingUp的实现,详细分析见
https://www.javadoop.com/post/rate-limiter
https://www.jianshu.com/p/280bf2dbd6f0(推荐这篇)

在构造函数中通过double count, int warmUpPeriodInSec, int coldFactor三个参数,计算出warningToken,maxToken,slope

但是由于关注的qps所以对一些变量进行了替换

  • warmupPeriod=warmUpPeriodInSec
  • stableInterval=1/count
  • maxPermits=maxToken
  • thresholdPermits=warningToken
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {

if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}

this.count = count;

this.coldFactor = coldFactor;

// thresholdPermits = 0.5 * warmupPeriod / stableInterval.
// warningToken = 100;
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// / maxPermits = thresholdPermits + 2 * warmupPeriod /
// (stableInterval + coldInterval)
// maxToken = 200
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));

// slope
// slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
// - thresholdPermits);
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);

}

通过判断计算该点对应的qps阈值,进行判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps();

long previousQps = (long) node.previousPassQps();
//设置 storedTokens 和 lastFilledTime 到正确的值,重新装填令牌桶
syncToken(previousQps);

// 开始计算它的斜率
// 如果进入了警戒线,开始调整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}

return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}

long oldValue = storedTokens.get();
//计算新的令牌数量
long newValue = coolDownTokens(currentTime, passQps);

if (storedTokens.compareAndSet(oldValue, newValue)) {
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}

}

private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;

// 添加令牌的判断前提条件:
// 当令牌的消耗程度远远低于警戒线的时候
if (oldValue < warningToken) {
//按照每秒count个的速度掉落
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
//令牌消耗速率大于警戒线
} else if (oldValue > warningToken) {
//前一个bucket通过的qps小于阈值/ coldFactor,说明系统消耗令牌的速度,小于冷却速度,需要添加令牌
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
WarmUpRateLimiterController

RateLimiterController与WarmUpController的结合

通过WarmUpController中一样的算法计算出当前的qps阈值,再用该阈值去计算等待时间

DegradeSlot

实现熔断降级的切入点,功能由ProcessorSlot、CircuitBreaker、DegradeRule、DegradeRuleManager组合完成。且经过一次升级,通过状态间的自动转换避免了原版本使用定时器的缺点。

熔断策略
  • SLOW_REQUEST_RATIO:按慢请求比率
  • ERROR_RATIO:按失败比率
  • ERROR_COUNT:按失败次数

ExceptionCircuitBreaker实现根据异常比例熔断

ResponseTimeCircuitBreaker实现根据RT时间熔断

slot中通过调用两者的onRequestComplete方法统计并且判断是否熔断的逻辑

熔断器状态
  • 当熔断器状态为半开启状态时,直接拒绝请求;
  • 当熔断器为关闭状态时,请求被允许通过;
  • 当熔断器状态为开启状态时,根据 timeWindow 尝试将开关状态改为半闭合,如何修改成功,则允许当前请求通过。
来源见参考链接
详细分析

这一块的实现,内部版本与开源版本略有不同,这边的分析以开源版本为例。

核心方法如下

AbstractCircuitBreaker#tryPass

1
2
3
4
5
6
7
8
9
10
11
12
public boolean tryPass(Context context) {
// Template implementation.
if (currentState.get() == State.CLOSED) {
return true;
}
if (currentState.get() == State.OPEN) {
// For half-open state we allow a request for probing.
//熔断器开启状态,判断是否到达可以重试时间并尝试将状态改到半开启
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}

AbstractCircuitBreaker#状态转换

基本都是使用CAS机制更改状态并且调用监听

下面展示的是解决一个bug的临时方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected boolean fromOpenToHalfOpen(Context context) {
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
notifyObservers(State.OPEN, State.HALF_OPEN, null);
Entry entry = context.getCurEntry();
entry.whenTerminate(new BiConsumer<Context, Entry>() {
@Override
public void accept(Context context, Entry entry) {
// Note: This works as a temporary workaround for https://github.com/alibaba/Sentinel/issues/1638
// Without the hook, the circuit breaker won't recover from half-open state in some circumstances
// when the request is actually blocked by upcoming rules (not only degrade rules).
//为了解决注释中提到的问题,在发生异常后将状态改回open,以防止永久处于half-open状态的bug
if (entry.getBlockError() != null) {
// Fallback to OPEN due to detecting request is blocked
currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
}
}
});
return true;
}
return false;
}

ExceptionCircuitBreaker#onRequestComplete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public void onRequestComplete(Context context) {
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
Throwable error = entry.getError();
SimpleErrorCounter counter = stat.currentWindow().value();
if (error != null) {
//出现异常,增加异常计数器
counter.getErrorCount().add(1);
}
//增加总计数器
counter.getTotalCount().add(1);

handleStateChangeWhenThresholdExceeded(error);
}

private void handleStateChangeWhenThresholdExceeded(Throwable error) {
//熔断器打开直接返回
if (currentState.get() == State.OPEN) {
return;
}

//熔断器半开,本次请求如果无异常,关闭熔断器,否则重新打开熔断器
if (currentState.get() == State.HALF_OPEN) {
// In detecting request
if (error == null) {
fromHalfOpenToClose();
} else {
fromHalfOpenToOpen(1.0d);
}
return;
}

//熔断器关闭状态
List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
for (SimpleErrorCounter counter : counters) {
errCount += counter.errorCount.sum();
totalCount += counter.totalCount.sum();
}
//未达到最小请求数,返回
if (totalCount < minRequestAmount) {
return;
}
double curCount = errCount;
//计算异常比例
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
// Use errorRatio
curCount = errCount * 1.0d / totalCount;
}
//异常比例大于阈值,熔断器打开
if (curCount > threshold) {
transformToOpen(curCount);
}
}

ResponseTimeCircuitBreaker#onRequestComplete

整体类似上一个方法,不再赘述

一些对应关系

一个context(context name为唯一标识,context跟着线程走,切换线程需要手动切换)

  • 对应 一个调用链路 (默认值为sentinel_default_context)
  • 对应一个 entrancenode(在Context.enter时创建)

一个resource (resource name为唯一标识)

  • 对应一个责任链实例(共享同一个 NodeSelectorSlot 实例以及 ClusterBuilderSlot实例)
  • 对应一个ClusterNode(不同origin会在该ClusterNode下挂不同的数据节点)

一个DefaultNode(维持调用树状结构)

  • 对应一个context下的一个resource,即它的纬度是context*resource,存在NodeSelectorSlot的map中(key为context name)
来源见参考链接
来源见参考链接

举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//规则
private static void initFlowRules(){
List<FlowRule> rules = new ArrayList<>();

FlowRule rule = new FlowRule();
rule.setResource("helloAnother");
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// Set limit QPS to 20.
rule.setCount(20);

FlowRule rule2 = new FlowRule();
rule2.setResource("hello");
rule2.setGrade(RuleConstant.FLOW_GRADE_QPS);
// Set limit QPS to 2.
rule2.setCount(2);

rules.add(rule);
rules.add(rule2);
FlowRuleManager.loadRules(rules);
}

//资源hello
@Override
@SentinelResource(value = "hello", fallback = "helloFallback")
public String hello(long s) {
if (s < 0) {
throw new IllegalArgumentException("invalid arg");
}
return String.format("Hello at %d", s);
}

//资源helloAnother
@Override
@SentinelResource(value = "helloAnother", defaultFallback = "defaultFallback",
exceptionsToIgnore = {IllegalStateException.class})
public String helloAnother(String name) {
if (name == null || "bad".equals(name)) {
throw new IllegalArgumentException("oops");
}
if ("foo".equals(name)) {
throw new IllegalStateException("oops");
}
String temp=testService.hello(2);
return "Hello, " + name + " " + temp;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class App 
{

public static int count=0;
public static int interval=0;

public static void main( String[] args )
{
if (args.length==0){
interval=100;
}else {
Float qps=Float.parseFloat(args[0]);
interval=Math.round(1000/qps);
}
doTest(interval);
}

public static void doTest(int interval){
while (true){
try {
TimeUnit.MILLISECONDS.sleep(interval);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
firstLevel();
} catch (Exception e) {
System.out.println(e.getMessage() + " block");
}
}

}

public static void firstLevel(){
ContextUtil.enter("level-test","application");
try (Entry entry = SphU.entry("firstLevel", EntryType.IN)){
if (count%(interval*5)==0){
System.out.println("firstLevel");
}
secondLevel();
} catch (BlockException e) {
throw new RuntimeException("firstLevel");
}
}

public static void secondLevel(){
ContextUtil.enter("level-test","application");
try (Entry entry = SphU.entry("secondLevel",EntryType.OUT)){
if (count%(interval*5)==0){
System.out.println("secondLevel");
count=0;
}
count++;
} catch (BlockException e) {
throw new RuntimeException("secondLevel");
}
}
}

参考链接

https://github.com/sentinel-group/sentinel-awesome

https://www.javadoop.com/post/sentinel

http://learn.lianglianglee.com/