Sentinel学习笔记:sentinel-cluster

简介

由于流量存在不均匀的情况,单机限流无法实现准确的集群限流,往往在总流量没有到达集群阈值的情况下,集群中的某些机器就开始限流。例如一个集群限流阈值为200qps,由两个节点组成,那么每个节点限流配置100qps,在这种配置下即便集群流量未到达200qps,单个节点也有可能到达100qps而开始限流。而集群限流就是为这种场景服务的,目的就是实现精确地控制整个集群的 QPS。

模块简介

sentinel-cluster-common-default:公共模块,定义通信协议,包括编码器和解码器接口、请求和响应实体(数据包),与底层使用哪种通信框架无关

sentinel-cluster-client-default:集群限流客户端模块,实现公共模块定义的接口,使用 Netty 进行通信,实现自动连接与掉线重连、提供连接配置 API

sentinel-cluster-server-default:集群限流服务端模块,实现公共模块定义的接口,使用 Netty 进行通信,同时提供扩展接口对接规则判断的具体实现(TokenService)

sentinel-cluster-server-envoy-rls:提供了Envoy 集群流量控制,使sentinel具有在Service Mesh下的流控能力

核心原理

单机限流流程

  1. FlowSlot 作为切入点,在 entry 方法中调用 FlowRuleChecker#checkFlow 方法判断是否限流;
  2. FlowRuleChecker 根据资源名称从规则管理器获取配置的限流规则,遍历限流规则;
  3. 根据限流规则的 clusterMode 决定走本地限流逻辑还是走集群限流逻辑;
  4. 如果是本地限流,则调用流量效果控制器判断是否拒绝当前请求。

由于网络延迟的存在,Sentinel 集群限流并未实现匀速排队流量效果控制,也没有支持冷启动,而只支持直接拒绝请求的流控效果。(具有优先级的流量在一定情况下可以尝试抢占之后的时间窗口的指标,见core部分DefaultController)

集群限流流程

在上述第(3)步中如判断为集群限流,则通过远程调用向集群限流服务端(TokenService)发起调用,由TokenService判断是否拒绝请求。结合令牌桶的思想,TokenService类似于令牌发放员,负责生产令牌,客户端向服务端申请令牌。

服务模式

Sentinel 集群限流客户端与集群限流服务端通信只保持一个长连接,底层通信基于 Netty 框架实现,自定义通信协议,并且数据包较小,网络 I/O 性能方面影响不大。

Sentinel 集群限流对限流服务端的可用性要求不高,当限流服务端挂掉时,可回退为本地限流。

嵌入模式

简单理解为,TokenService作为应用的内置服务同应用一起启动,可动态挑选一个节点作为TokenService(不具备类似主从自动切换的功能)

独立模式

单独部署的TokenService,可以为多个服务提供集群限流支持

整体扩展架构

来源见文末

核心类及流程

来源见文末

sentinel-core 模块的 cluster 包

定义了实现集群限流功能的相关接口

  • TokenService:定义客户端向服务端申请 token 的接口,由 FlowRuleChecker 调用
  • ClusterTokenClient:集群限流客户端需要实现的接口,继承 TokenService
  • ClusterTokenServer:集群限流服务端需要实现的接口
  • EmbeddedClusterTokenServer:支持嵌入模式的集群限流服务端需要实现的接口,继承 TokenService、ClusterTokenServer

在上图中DefaultClusterTokenClient是 sentinel-cluster-client-default 模块中的 ClusterTokenClient 接口实现类,DefaultTokenService 与 DefaultEmbeddedTokenServer 分别是 sentinel-cluster-server-default 模块中的 ClusterTokenServer 接口与 EmbeddedClusterTokenServer 接口的实现类。

当使用嵌入模式启用集群限流服务端时,使用的是 EmbeddedClusterTokenServer,否则使用 ClusterTokenServer,通过 Java SPI 实现

集群限流客户端

与单机限流的不同之处从FlowRuleChecker#canPassCheck开始,在该方法中若判断为集群限流则调用FlowRuleChecker#passClusterCheck方法

FlowRuleChecker#passClusterCheck(……)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
try {
//获取TokenService实例
TokenService clusterService = pickClusterService();
if (clusterService == null) {
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
//获取集群限流规则的唯一全局id
long flowId = rule.getClusterConfig().getFlowId();
//申请令牌
TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
//根据结果判断是否拒绝请求
return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
// If client is absent, then fallback to local mode.
} catch (Throwable ex) {
RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
}
// Fallback to local flow control when token client or server for this rule is not available.
// If fallback is not enabled, then directly pass.
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}

pickClusterService()

如果当前节点是客户端角色,获取ClusterTokenClient实例如果当前节点是服务端角色(嵌入式),获取EmbeddedClusterTokenServer实例

requestToken(flowId, acquireCount, prioritized)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
//检查输入有效性
if (notValidRequest(flowId, acquireCount)) {
return badRequest();
}
//构造请求体
FlowRequestData data = new FlowRequestData().setCount(acquireCount)
.setFlowId(flowId).setPriority(prioritized);
//构造请求
ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);
try {
//发送请求,通过netty建立的连接发送
TokenResult result = sendTokenRequest(request);
logForResult(result);
return result;
} catch (Exception ex) {
ClusterClientStatLogUtil.log(ex.getMessage());
return new TokenResult(TokenResultStatus.FAIL);
}
}

applyTokenResult(result, rule, context, node, acquireCount, prioritized)

  • 当响应状态码为 OK 时放行请求;
  • 当响应状态码为 SHOULD_WAIT 时,休眠指定时间再放行请求;
  • 当响应状态码为 BLOCKED,直接拒绝请求;
  • 其它状态码均代表调用失败,根据规则配置的 fallbackToLocalWhenFail 是否为 true,决定是否回退为本地限流,如果需要回退为本地限流模式,则调用 passLocalCheck 方法重新判断。

集群限流服务端

从客户端发来的requestToken请求最后交由com.alibaba.csp.sentinel.cluster.flow.DefaultTokenService#requestToken方法处理

DefaultTokenService#requestToken(Long ruleId, int acquireCount, boolean prioritized)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
//验证规则有效性
if (notValidRequest(ruleId, acquireCount)) {
return badRequest();
}
// The rule should be valid.
//获取对应规则
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
if (rule == null) {
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
}
//获取令牌
return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
}

ClusterParamFlowChecker#acquireClusterToken

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
static TokenResult acquireClusterToken(/*@Valid*/ FlowRule rule, int acquireCount, boolean prioritized) {
Long id = rule.getClusterConfig().getFlowId();
//命名空间纬度的全局阈值
if (!allowProceed(id)) {
return new TokenResult(TokenResultStatus.TOO_MANY_REQUEST);
}
//获取数据统计滑动窗口
ClusterMetric metric = ClusterMetricStatistics.getMetric(id);
if (metric == null) {
return new TokenResult(TokenResultStatus.FAIL);
}
//获取已通过数
double latestQps = metric.getAvg(ClusterFlowEvent.PASS);
//计算阈值
double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.getExceedCount();
//剩余通过数=阈值-已通过数-请求数
double nextRemaining = globalThreshold - latestQps - acquireCount;
//剩余通过数>0直接放行
if (nextRemaining >= 0) {
// TODO: checking logic and metric operation should be separated.
metric.add(ClusterFlowEvent.PASS, acquireCount);
metric.add(ClusterFlowEvent.PASS_REQUEST, 1);
if (prioritized) {
// Add prioritized pass.
metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount);
}
// Remaining count is cut down to a smaller integer.
return new TokenResult(TokenResultStatus.OK)
.setRemaining((int) nextRemaining)
.setWaitInMs(0);
} else {
//剩余通过数不足,但是是优先流量,允许占用之后指标,类似core部分DefaultControlle
if (prioritized) {
// Try to occupy incoming buckets.
//获取已经占用数
double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING);
//已经占用数小于允许占用的最大值(比例*阈值)
if (occupyAvg <= ClusterServerConfigManager.getMaxOccupyRatio() * globalThreshold) {
//计算占用后等待时间
int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold);
// waitInMs > 0 indicates pre-occupy incoming buckets successfully.
if (waitInMs > 0) {
ClusterServerStatLogUtil.log("flow|waiting|" + id);
return new TokenResult(TokenResultStatus.SHOULD_WAIT)
.setRemaining(0)
.setWaitInMs(waitInMs);
}
// Or else occupy failed, should be blocked.
}
}
// Blocked.
metric.add(ClusterFlowEvent.BLOCK, acquireCount);
metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1);
ClusterServerStatLogUtil.log("flow|block|" + id, acquireCount);
ClusterServerStatLogUtil.log("flow|block_request|" + id, 1);
if (prioritized) {
// Add prioritized block.
metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount);
ClusterServerStatLogUtil.log("flow|occupied_block|" + id, 1);
}

return blockedResult();
}
}

ClusterParamFlowChecker#calcGlobalThreshold

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static double calcGlobalThreshold(ParamFlowRule rule, Object value) {
//获取阈值
double count = getRawThreshold(rule, value);
//当阈值类型为集群总 QPS 时,直接使用限流规则的阈值(count);
//当阈值类型为单机均摊时,根据规则 ID 获取当前连接的客户端总数,将当前连接的客户端总数乘以限流规则的阈值(count)作为集群总 QPS 阈值。
switch (rule.getClusterConfig().getThresholdType()) {
case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL:
return count;
case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL:
default:
int connectedCount = ClusterParamFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId());
return count * connectedCount;
}
}

ClusterMetric#tryOccupyNext

简化版的DefaultControlle,只尝试占用下一个样本窗口的指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public int tryOccupyNext(ClusterFlowEvent event, int acquireCount, double threshold) {
//获取当前窗口通过数
double latestQps = getAvg(ClusterFlowEvent.PASS);
//判断能否占用之后窗口
if (!canOccupy(event, acquireCount, latestQps, threshold)) {
return 0;
}
metric.addOccupyPass(acquireCount);
add(ClusterFlowEvent.WAITING, acquireCount);
return 1000 / metric.getSampleCount();
}

private boolean canOccupy(ClusterFlowEvent event, int acquireCount, double latestQps, double threshold) {
//获取未来窗口的通过数
long headPass = metric.getFirstCountOfWindow(event);
//获取未来窗口占用数
long occupiedCount = metric.getOccupiedCount(event);
// bucket to occupy (= incoming bucket)
// ↓
// | head bucket | | | | current bucket |
// +-------------+----+----+----+----------- ----+
// (headPass)
//判断当前窗口已通过数+(请求数+未来窗口被占用数)-未来窗口通过数(重复计算所以减去)是否小于阈值
return latestQps + (acquireCount + occupiedCount) - headPass <= threshold;
}

滑动窗口

集群限流使用的滑动窗口并非 sentinel-core 模块下实现的滑动窗口,而是 sentinel-cluster-server-default 模块自己实现的滑动窗口

实现集群限流需要收集的指标数据有以下几种:

1
2
3
4
5
6
7
8
9
public enum ClusterFlowEvent {
PASS,
BLOCK,
PASS_REQUEST,
BLOCK_REQUEST,
OCCUPIED_PASS,
OCCUPIED_BLOCK,
WAITING
}
  • PASS:已经发放的令牌总数
  • BLOCK:令牌申请被驳回的总数
  • PASS_REQUEST:被放行的请求总数
  • BLOCK_REQUEST:被拒绝的请求总数
  • OCCUPIED_PASS:预占用,已经发放的令牌总数
  • OCCUPIED_BLOCK:预占用,令牌申请被驳回的总数
  • WAITING:当前等待下一个时间窗口到来的请求总数

除统计的指标项与 sentinel-core 包下实现的滑动窗口统计的指标项有些区别外,实现方式都一致。

总结

集群限流并非解决请求倾斜问题,在请求倾斜严重的情况下,集群限流可能会导致某些节点的流量过高,导致系统的负载过高,这时就需要使用系统自适应限流、熔断降级作为兜底解决方案。

参考资料

http://learn.lianglianglee.com/%E4%B8%93%E6%A0%8F/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3%20Sentinel%EF%BC%88%E5%AE%8C%EF%BC%89/18%20Sentinel%20%E9%9B%86%E7%BE%A4%E9%99%90%E6%B5%81%E7%9A%84%E5%AE%9E%E7%8E%B0%EF%BC%88%E4%B8%8A%EF%BC%89.md

https://github.com/alibaba/Sentinel/wiki/%E9%9B%86%E7%BE%A4%E6%B5%81%E6%8E%A7