解决方案:实时计算系列(3) - 规则引擎和 Flink CEP

优采云 发布时间: 2022-12-06 07:24

  解决方案:实时计算系列(3) - 规则引擎和 Flink CEP

  复杂事件处理(CEP),在企业内部实践中,常被称为规则引擎。随着实时数仓的发展,CEP将成为众多实时计算相关团队的又一主要发展方向。

  如果你对实时计算感兴趣,欢迎阅读其他文章:

  什么是 CEP?

  CEP 是 Complex Event Processing 的缩写。区分这类事件处理的核心原因是计算范式比普通的实时计算更“复杂”。这种复杂性不在业务逻辑上,而在技术上。不同的计算范式,示例如下:

  CEP本身并没有脱离实时计算的范围,所以大部分用户还是选择基于Flink或者已有的计算服务来构建相关框架。CEP对应的函数以库的形式存在。而且,从上面的例子可以看出,这些场景在业务中是非常常用的。如果定制一个或几个需求来解决某个需求,大多数工程师肯定觉得没有问题。

  通用系统架构

  然而,实际情况往往不是写几条SQL,几行代码那么简单。对于大多数CEP应用场景,“复杂规则”的制定者通常是运营、商户、市场等非技术类的学生。对于大多数CEP的业务效果而言,通常是在短时间内直接到达用户,比如发送优惠券、发送推送消息等。

  这种实时计算脱离了以往的BI场景,与真实的业务效果挂钩。这也是导致系统非常复杂的一个重要因素。所以很多企业将这个系统抽象成一个规则引擎服务来完成。

  规则引擎服务的架构通常如下图所示:

  CEP-架构实施困难

  由于不同于BI场景,规则引擎的输出与用户终端的性能直接挂钩,实现上比一般的实时数仓场景更加严谨,具体体现在:

  组件复杂度高:以上述架构图为例,进入CEP的数据流多种多样,可能存在窗口计算、多流Join等复杂处理。CEP规则引擎输出的数据需要经过各种校验、底线等处理逻辑。就平台而言,一个完整的、真正可用的平台,需要收录从规则配置到最终ROI计算的交付闭环。

  离线不一致:CEP规则引擎属于在线计算。优点是延迟高,缺点是数据的输出与事件的先后顺序强相关。即使开发者使用eventtime,也会面临事件时间超过watermark而被丢弃的问题。如果事后有相关反馈,将时序相关的计算逻辑引入到离线计算中会是一个很复杂的问题,而且即使计算正确,也不一定和当时的线上任务完全一致,比如job消息积压,客户端延迟发送会导致数据准确性问题。

  准确性验证:以发放优惠券或广告为例。这样的行为最终会被用于ROI的计算,所以每个规则的触发都需要保证准确性,并且有一定的“后台”措施。常见的自下而上措施包括频率控制、为指定规则设置最大触发值等。

  Flink 中的 CEP

  CEP 在 Flink 中以库的形式存在,不与其底层引擎代码绑定。它只是继承了许多低级API。在阅读cep代码的过程中,你还可以学到很多Flink的新奇使用方式。我们可以简单的将Flink内部的CEP实现分为以下几个步骤:

  规则分析

  Flink 中的 CEP 借鉴了 Efficient Pattern Matching over Event Streams 中的 NFA 模型。这篇论文中也提到了一些内存优化,我们这里略过。

  文中提到NFA,即Non-determined Finite Automaton,称为不确定有限状态机,意思是状态是有限的,但每个状态都可能转化为多个状态(不确定)。

  下面以一个简单的CEP规则为例,看看NFA中这些事件之间有什么样的关系,

  Pattern pattern = Pattern.begin("begin").where(new SimpleCondition() {

@Override

public boolean filter(Event value) throws Exception {

return value.getName().equals("a");

}

}).followedBy("middle").where(new SimpleCondition() {

@Override

public boolean filter(Event value) throws Exception {

return value.getName().equals("b");

}

}).followedBy("end").where(new SimpleCondition() {

@Override

<p>

public boolean filter(Event value) throws Exception {

return value.getName().equals("c");

}

});</p>

  规则如上,很明显我们要找的是a-&gt;b-&gt;c这样的事件组合,对应NFA内部,会根据这个事件关系生成状态转移图,大体逻辑如下:

  cep-nfa

  每个节点对应规则匹配过程中的一个状态。例如,“开始”节点是初始化状态。在接收到value="a"的数据之前,匹配会一直处于"begin"状态;每条Edges对应状态之间的转移条件,例如value="a"的数据满足从"begin"到"middle"的转移条件。节点的概念更容易理解。这是边缘类型的抽象:

  规则匹配

  规则解析后生成NFA,接下来就是接收具体的数据,然后进行匹配过程。中间状态的存储在匹配过程中非常重要,即如何存储当前的匹配进度。NFA中使用了ShareBuffer的概念。我们可以在 Flink 中自定义一个 State 来存储细节。还是以上面的a-&gt;b-&gt;c为例,假设事件的输入是a1,b1,c1,那么就会出现a1-&gt;b1-&gt;c1的匹配结果A,*敏*感*词*如下:

  cep匹配

  上面的例子很简单,这里我们期望把情况复杂化,我们输入a1,a2,b1,b2,c1,那么此时算子会输出4个结果:

  a1-&gt;b1-&gt;c1a1-&gt;b2-&gt;c1a2-&gt;b1-&gt;c1a2-&gt;b2-&gt;c2

  可以看出,四个输出序列都符合CEP规则。我们同时在 NFA 图上进行了多次匹配。这是如何实现的?参考如下伪代码逻辑,每条记录:

  for state in partialStates: // 遍历正在匹配中的状态

for edge in state.edges: // 遍历状态的边,逐一检查是否满足条件

if match: // 如果满足,状态发生转移

partialStates.remove(state)

newState = state.transTo(edge.targetState)

partialStates.add(newState)

// 如果初始化状态发生了转化,新增一个初始化状态,准备新的一次匹配

if not partialStates.contains(beginState):

partialStates.add(beginState)

  另外,我们没有单独存储每个序列,而是在每个状态节点下创建一个List,并使用前向指针来描述每个事件之间的关系,从而在内存中复用每个事件进行存储,关于ShareBuffer我们会在“匹配事件提取”的过程。

  接下来说说稍微复杂一点的匹配情况。在业务场景中,通常规则的制定都会有一个时间窗口(否则Flink会一直匹配),比如某天A事件先发生,B事件在后发生:

  Pattern pattern = Pattern.begin("begin").where(new SimpleCondition() {

@Override

public boolean filter(Event value) throws Exception {

return value.getName().equals("a");

}

}).followedByAny("middle").where(new SimpleCondition() {

<p>

@Override

public boolean filter(Event value) throws Exception {

return value.getName().equals("b");

}

}).within(Time.days(1));</p>

  这里,within(Time)用于标识整个序列的匹配时间窗口。注意这个和Flink Window使用的自然时间是不一样的。这里的窗口是由序列的第一个匹配事件触发的,比如在18:02匹配到第一个事件,则窗口结束时间为次日18:02。Flink 通过在 CEP 算子中注册 Timer 来实现这一机制。当第一次匹配事件完成后,注册结束时间对应的Timer,并保存startTimestamp(第一次匹配事件的时间戳),Timer会在第二天触发。遍历所有匹配的状态,如果匹配到currentTime &gt; startTimestamp + 1day,则执行相应的超时处理逻辑(用户可自定义)。

  Flink 在 CEP 算子中定义了丰富的匹配语义,这里就不一一列举了。实现的语义细节可以参考:/flink/flink-docs-master/docs/libs/cep/,由于Flink对实时计算功能的要求实现非常丰富,所以CEP的实现确实不超过 Flink 作为实时计算引擎本身的能力。

  匹配事件提取

  完成匹配过程之后,接下来就是如何提取匹配的事件列表,或者以上述规则a-&gt;b-&gt;c为例,当事件匹配到Output阶段时,Flink需要做什么do是列出匹配到的事件的输出,其对应的UserAPI接口如下:

  class MyPatternProcessFunction extends PatternProcessFunction {

@Override

public void processMatch(Map match, Context ctx, Collector out) throws Exception;

IN startEvent = match.get("start").get(0);

IN endEvent = match.get("end").get(0);

out.collect(OUT(startEvent, endEvent));

}

}

  这里Map&gt;match表示匹配成功,Map的Key表示状态节点的名称,List表示每个状态节点对应的事件列表。这就涉及到一个问题。当同时有多个匹配时,Flink 是如何确定输出哪些事件列表的?

  上文提到,Flink 在 NFA 的每个状态节点下创建一个 List,并使用前向指针来描述每个事件之间的关系,从而实现对每个事件的复用。这样的关系图看起来有点乱。我们需要一个版本来识别每条边的方向。这也是基于NFA论文中ShareBuffer的思想。Flink 赋予了每条边一个版本的概念,这样在输出的时候就可以根据版本追溯匹配的路径。这是目前在 Flink 中完成的:

  杜威

  上图的匹配情况(期望匹配a-&gt;多个b-&gt;c)就是一个例子。对于每一个元素,都会有一条边指向相连的元素,通过版本号的前缀来判断兼容性,比如1.0。0兼容1.0,1.0.1.0兼容1.0.1。匹配完成后,从最后一个元素开始向前遍历,得到一个完整的列表。生成版本号时,根据状态转换的次数来确定。比如图中中间状态的b1元素,当接收到b2事件时,会发生两次状态转换,一是满足从middle到end的转换条件, From middle to end,二是保存到当前中间,匹配多个b事件;

  这里Flink的内部实现与论文中NFA的ShareBuffer有些不同。在论文中,考虑了更多具有多个规则的场景。*敏*感*词*如下:

  杜威

  论文中版本号的长度代表状态节点的路径长度,然后通过路径中的分支数来升级版本号。比如上图中的e5节点,有一个fork,所以边缘版本e6-&gt;e5从1.0升级到1.1,兼容规则是1.1向下兼容当前路径长度,例如, 1.1 与 1.0 兼容。详细原理可以参考论文,这里不再赘述。

  存在的问题

  Flink 基于NFA 的CEP 算子实现整体上还是比较完善的,但是如前所述,CEP 的应用场景通常比较复杂,稍微大一点的场景很难直接基于开源实现来应用。这里有些例子:

  其他 CEP 引擎

  我们可以顺便了解一下其他的一些CEP引擎,比如siddhi,目前做的比较好,但是siddhi的定位是嵌入式流计算框架,有自己的一套语法和用法,也有一定的用户量。但如果用户选择siddhi,则需要自己完成分布式部署(可能使用Kubernetes会很方便),并且有两个流计算技术栈(Flink和siddhi)。当然,陈昊将siddhi和Flink结合起来,还有一个flink-siddhi项目,有兴趣的可以看看。

  总结

  本文阐述了规则引擎的系统架构,详细阐述了Flink CEP的内部实现原理。关于CEP未来的应用前景,我认为随着现在实时数仓的普及,很多公司会把实时计算从传统的BI报表场景演进到越来越复杂的场景,CEP也将会是广泛使用的场景之一。

  但是,如上所述,规则引擎本身就有一个完整的体系。目前观察到的CEP引擎的选型,通常采用Flink+自定义算子(CEP或者根据业务场景定义),以及基于在线服务+在线存储来自定义实现规则引擎,无论哪种方式,架构师要花费大量精力去设计一个完整的端到端链路,这也说明了这方面现有的基础设施和开源项目基础都非常缺失,期待更加专业和未来会出现系统性的项目。

  总结:文章相似度检测工具在线检测_检测两篇文章的重复率

  好的回答者:Sail

  PaperFree免费论文检测软件——全球首款免费论文相似度检测系统;提供免费论文抄袭检查、免费论文检测、免费毕业论文抄袭检测。最权威、最科学、最受学生欢迎的免费检测系统。文章赵耀静:是一款文章原创学位在线检测工具,文章是否是原创,基于文章发布时间:同一篇文章文章,发布时间越早收录,越容易被搜索引擎认为是原创文章 通过搜索引擎。文本顺序:如果两个。

  ---------------------------------------------- --------------

  受访者:朱育爱

  

  文章Demon Mirror: House of Cards文章论文反抄袭工具,使用搜索引擎搜索文章或论文中的句子,分析文章或每一句的相似度论文的原创进行文章相似度检测,如果没有原创,给抄袭的论文相似度在线检测工具 推荐你两个大学常用的recheck服务:iThenticate subject to most high impact Factor Journals 采用 PlagScan,它为每年超过 5000 万份文件的重复率提供了清晰、易于理解的报告。

  扩展信息:

  1.查看两篇文章文章的重复率

  2.两个文章相似度检测工具

  

  3.在线生成原创文章

  4.文章魔镜免费检查

  5. 麒麟原创度在线检测

  目前的文章相似度检测工具,我主要使用蚂蚁小二检测,这是一款一键分发,多账号管理的工具。主要是免费的,适合自媒体像我这样贫民窟&gt;的人,现在可以分发30多个网页文本 1:网页文本 2:页面相似度: 页面相似度检测:网页相似度检测 通过对比网页文本来检测相似度。页面相似性检测工具:页面相似性检测文章相似性检测原创文章伪原创文章文章。

  参考链接:

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线