源码剖析 Sentinel 实时数据采集实现原理

优采云 发布时间: 2020-08-27 01:03

  源码剖析 Sentinel 实时数据采集实现原理

  本篇将重点关注 Sentienl 实时数据搜集,即 Sentienl 具体是怎样搜集调用信息,以此来判定是否须要触发限流或熔断。

  本节目录

  Sentienl 实时数据搜集的入口类为 StatisticSlot。

  我们先简单来看一下 StatisticSlot 该类的注释,来看一下该类的整体定位。

  StatisticSlot,专用于实时统计的 slot。在步入一个资源时,在执行 Sentienl 的处理链条中会步入到该 slot 中,需要完成如下估算任务:

  接下来用源码剖析的手段来详尽剖析 StatisticSlot 的实现原理。

  1、源码剖析 StatisticSlot 1.1 StatisticSlot entry 详解

  StatisticSlot#entry

  public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {

try {

// Do some checking.

fireEntry(context, resourceWrapper, node, count, prioritized, args); // @1

// Request passed, add thread count and pass count.

node.increaseThreadNum(); // @2

node.addPassRequest(count);

if (context.getCurEntry().getOriginNode() != null) { // @3

// Add count for origin node.

context.getCurEntry().getOriginNode().increaseThreadNum();

context.getCurEntry().getOriginNode().addPassRequest(count);

}

if (resourceWrapper.getEntryType() == EntryType.IN) { // @4

// Add count for global inbound entry node for global statistics.

Constants.ENTRY_NODE.increaseThreadNum();

Constants.ENTRY_NODE.addPassRequest(count);

}

// Handle pass event with registered entry callback handlers.

for (ProcessorSlotEntryCallback handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { // @5

handler.onPass(context, resourceWrapper, node, count, args);

}

} catch (PriorityWaitException ex) { // @6

node.increaseThreadNum();

if (context.getCurEntry().getOriginNode() != null) {

// Add count for origin node.

context.getCurEntry().getOriginNode().increaseThreadNum();

}

if (resourceWrapper.getEntryType() == EntryType.IN) {

// Add count for global inbound entry node for global statistics.

Constants.ENTRY_NODE.increaseThreadNum();

}

// Handle pass event with registered entry callback handlers.

for (ProcessorSlotEntryCallback handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {

handler.onPass(context, resourceWrapper, node, count, args);

}

} catch (BlockException e) { // @7

// Blocked, set block exception to current entry.

context.getCurEntry().setError(e);

// Add block count.

node.increaseBlockQps(count);

if (context.getCurEntry().getOriginNode() != null) {

context.getCurEntry().getOriginNode().increaseBlockQps(count);

}

if (resourceWrapper.getEntryType() == EntryType.IN) {

// Add count for global inbound entry node for global statistics.

Constants.ENTRY_NODE.increaseBlockQps(count);

}

// Handle block event with registered entry callback handlers.

for (ProcessorSlotEntryCallback handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {

handler.onBlocked(e, context, resourceWrapper, node, count, args);

}

throw e;

} catch (Throwable e) { // @8

// Unexpected error, set error to current entry.

context.getCurEntry().setError(e);

// This should not happen.

node.increaseExceptionQps(count);

if (context.getCurEntry().getOriginNode() != null) {

context.getCurEntry().getOriginNode().increaseExceptionQps(count);

}

if (resourceWrapper.getEntryType() == EntryType.IN) {

Constants.ENTRY_NODE.increaseExceptionQps(count);

}

throw e;

}

}

  代码@1:首先调用 fireEntry,先调用 Sentinel Slot Chain 中其他的处理器,执行完其他处理器的逻辑,例如 FlowSlot、DegradeSlot,因为 StatisticSlot 的职责是搜集统计信息。

  代码@2:如果后续处理器成功执行,则将正在执行线程数统计指标加一,并将通过的恳求数目指标降低对应的值。下文会对 Sentinel Node 体系进行详尽的介绍,在 Sentinel 中使用 Node 来表示调用链中的某一个节点,每个节点关联一个资源,资源的实时统计信息就存贮在 Node 中,故该部份也是调用 DefaultNode 的相关技巧来改变线程数等,将在下文会向详尽介绍。

  代码@3:如果上下文环境中保存了调用的源头(调用方)的节点信息不为空,则更新该节点的统计数据:线程数与通过数目。

  代码@4:如果资源的步入类型为 EntryType.IN,表示入站流量,更新入站全局统计数据(集群范围 ClusterNode)。

  代码@5:执行注册的步入Handler,可以通过 StatisticSlotCallbackRegistry 的 addEntryCallback 注册相关*敏*感*词*器。

  代码@6:如果捕获到 PriorityWaitException ,则觉得是等待过一定时间,但最终还是算通过,只需降低线程的个数,但无需降低节点通过的数目,具体缘由我们在详尽剖析限流部份时会重点讨论,也会再度探讨 PriorityWaitException 的含意。

  代码@7:如果捕获到 BlockException,则主要降低阻塞的数目。

  代码@8:如果是系统异常,则降低异常数目。

  我想里面的代码应当不难理解,但涉及到统计指标数据的变化,都是调用 DefaultNode node 相关的方式,从这儿也可以看出,Node 将是实时统计数据的直接持有者,那毋容置疑接下来将重点来学习 Node,为了知识体系的完备性,我们先来看一下 StatisticSlot 的 exit 方法。

  1.2 StatisticSlot exit 详解

  StatisticSlot#exit

  public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {

DefaultNode node = (DefaultNode)context.getCurNode();

if (context.getCurEntry().getError() == null) { // @1

// Calculate response time (max RT is TIME_DROP_VALVE).

long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();

if (rt > Constants.TIME_DROP_VALVE) {

rt = Constants.TIME_DROP_VALVE;

}

// Record response time and success count.

node.addRtAndSuccess(rt, count);

if (context.getCurEntry().getOriginNode() != null) {

context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);

}

node.decreaseThreadNum();

if (context.getCurEntry().getOriginNode() != null) {

context.getCurEntry().getOriginNode().decreaseThreadNum();

}

if (resourceWrapper.getEntryType() == EntryType.IN) {

Constants.ENTRY_NODE.addRtAndSuccess(rt, count);

Constants.ENTRY_NODE.decreaseThreadNum();

}

} else {

// Error may happen.

}

// Handle exit event with registered exit callback handlers.

Collection exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();

for (ProcessorSlotExitCallback handler : exitCallbacks) { // @2

handler.onExit(context, resourceWrapper, count, args);

}

fireExit(context, resourceWrapper, count); // @3

}

  代码@1:成功执行,则重点关注响应时间,其实现亮点如下:

  计算本次响应时间,将本次响应时间搜集到 Node 中。

  将当前活跃线程数减一。

  代码@2:执行退出时的 callback。可以通过 StatisticSlotCallbackRegistry 的 addExitCallback 方法添加退出回调函数。

  代码@3:传播 exit 事件。

  接下来我们将重点介绍 DefaultNode,即 Sentinel 的 Node 体系,持有资源的实时调用信息。

  2、Sentienl Node 体系

  2.1 Node 类体系图

  

  我们先简单介绍一下上述核心类的作用与核心插口或核心属性的含意。

  long waiting()

  获取当前已申请的未来的令牌的个数。 void addWaitingRequest(long futureTime, int acquireCount)

  申请未来时间窗口中的令牌。 void addOccupiedPass(int acquireCount)

  增加申请未来令牌通过的个数。 double occupiedPassQps()

  当前占据未来令牌的QPS。 Node

  持有实时统计信息的节点。定义了搜集统计信息与获取统计信息的插口,上面方式依据技巧名称即可获知其涵义,故这儿就不一一列举了。 StatisticNode

  实现统计信息的默认实现类。 DefaultNode

  用于在特定上下文环境中保存某一个资源的实时统计信息。 ClusterNode

  实现基于集群限流模式的节点,将在集群限流模式部份详尽介绍。 EntranceNode

  用来表示调用链入口的节点信息。

  本文将详尽介绍 DefaultNode 与 StatisticNode,重点论述调用树与实时统计信息。DefaultNode 是 StatisticNode 的泛型,我们先从 StatisticNode 开始 Node 体系的探究。

  2、StatisticNode 详解 2.1 核心类图

  

  我们对其核心属性进行一一详解:

  关于 ArrayMetric 滑动窗口设计与实现原理,请参考笔者的另一篇博文:Alibaba Seninel 滑动窗口实现原理(文末附原理图)

  接下来我们选购几个具有代表性的方式进行探究。

  2.2 addPassRequest

  public void addPassRequest(int count) {

rollingCounterInSecond.addPass(count);

rollingCounterInMinute.addPass(count);

}

  增加通过恳求数目。即将实时调用信息向滑动窗口中进行统计。addPassRequest 即报告成功的通过数目。就是分别调用 秒级、分钟即对应的滑动窗口中添加数目,然后限流规则、熔断规则将基于滑动窗口中的值进行估算。

  2.3 totalRequest

  public long totalRequest() {

return rollingCounterInMinute.pass() + rollingCounterInMinute.block();

}

  获取当前时间戳的总恳求数,获取分钟级时间窗口中的统计信息。

  2.4 successQps

  public double successQps() {

return rollingCounterInSecond.success() / rollingCounterInSecond.getWindowIntervalInSec();

}

  成功TPS,用秒级统计滑动窗口中统计的个数 除以 窗口的间隔得出其 tps,即抽样个数越大,其统计越精确。

  温馨提示:上面的方式在学习了上文的滑动窗口设计原理后将变得十分简单,大家在学习的过程中,可以总结出一个规律,什么时候时侯使用秒级滑动窗口,什么时候使用分钟级滑动窗口。

  2.5 metrics

  由于 Sentienl 基于滑动窗口来实时搜集统计信息,并储存在显存中,并随着时间的推移,旧的滑动窗口将失效,故须要提供一个方式,及时将所有的统计信息进行汇总输出,供监控客户端定时拉取,转储都其他客户端,例如数据库,方便监控数据的可视化,这也一般是中间件用于监控指标的监控与采集的通用设计方式。

  public Map metrics() {

long currentTime = TimeUtil.currentTimeMillis();

currentTime = currentTime - currentTime % 1000; // @1

Map metrics = new ConcurrentHashMap();

List nodesOfEverySecond = rollingCounterInMinute.details(); // @2

long newLastFetchTime = lastFetchTime;

// Iterate metrics of all resources, filter valid metrics (not-empty and up-to-date).

for (MetricNode node : nodesOfEverySecond) {

if (isNodeInTime(node, currentTime) && isValidMetricNode(node)) { // @3

metrics.put(node.getTimestamp(), node);

newLastFetchTime = Math.max(newLastFetchTime, node.getTimestamp());

}

}

lastFetchTime = newLastFetchTime;

return metrics;

}

  代码@1:获取当前时间对应的滑动窗口的开始时间,可以对比上文估算滑动窗口的算法。

  代码@2:获取一分钟内的所有滑动窗口中的统计数据,使用 MetricNode 表示。

  代码@3:遍历所有节点,刷选出不是当前滑动窗口外的所有数据。这里的重点是方式:isNodeInTime。

<p>private boolean isNodeInTime(MetricNode node, long currentTime) {

return node.getTimestamp() > lastFetchTime && node.getTimestamp()

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线