使用Spark Streaming实施分布式采集系统

优采云 发布时间: 2020-08-07 10:10

  在我在微信矩中发布段落之前,解释了Spark Streaming不仅是流计算,而且是一种通用模型,使您可以专注于业务逻辑而无需关注分布式相关问题并快速解决业务问题.

  前言

  两天前,我刚刚在一篇文章中讲道,数据本质上是流式传输,并指出:

  批处理计算一直在缓慢地退化,未来必须属于流计算. 数据流必须由数据本身驱动.

  就高层概念而言,Spark Streaming完美地集成了批处理和流计算,使它们能够在您和我之间拥有我. 这种设计使Spark Streaming成为流计算的载体,并且还可以为需要分布式体系结构的其他问题提供解决方案.

  Spark Streaming作为某些分布式任务系统的基础的优势

  它自然是分布式的,因此无需担心实现分布式协调

  基于任务的任务执行机制,任务数可以随意控制

  无需关注机器,它是面向资源的,使得部署极其简单,声明资源,提交,结束

  完全集成的输入和输出,包括HDFS / Kafka / ElasticSearch / HBase / MySQL / Redis等,

  成熟而简单的运算符使您的数据处理变得极其简单

  StreamingPro项目简化了声明性或复杂的Spark Streaming程序,同时,它还可以通过StreamingPro提供的Rest接口增强Spark Streaming Driver的交互功能.

  现在以标题中的采集统为例. 您只需要为整个事情实现采集逻辑. 至于特定的元数据读取,只要简单的配置或使用现成的组件,结果就可以存储在任何地方,并且最终部署只需要简单的声明即可. 可以将资源放置在可以灵活扩展的群集上.

  有关此作品的概念,请参阅

  开发采集系统的动机

  当前,此采集系统主要用于监视. 每当公司或部门拥有大量开源系统时,每个开源组件将提供大约三种类型的输出:

  标准指标输出,方便您集成到神经节等监控系统中

  Spark,Storm,HBase等Web UI均提供自己的Web界面等.

  Rest接口,主要是JSon,XML,字符串

  但是对于监视来说,前两个是直观且易于使用的,但是它们也存在很大的问题:

  度量标准直接输出到监视系统意味着无法自定义它. 如果我想将多个指标放在一起,这可能很难实现.

  Web UI需要有人看到它

  相反,Rest接口是最灵活的,但是您需要做自己的写逻辑,例如获取数据,处理然后做自己的演示. 问题是,如果我要获取数千个Rest接口数据,并且需要一种非常方便的方法来提取所需的值(或指标). 这涉及两个问题:

  可能有很多采集接口. 如何使采集程序在水平方向上可扩展?

  接口返回的数据具有不同的形式. 如何提供一个方便且一致的模型,允许用户通过简单的配置提取内容?

  系统处理结构

  QQ20160529-1@2x.png

  一般信息提取方案

  回到上面的问题,

  接口返回的数据具有不同的形式. 如何提供一个方便且一致的模型,以便用户可以通过简单的配置提取内容

  Rest接口返回的数据仅是四种类型:

  HTML

  JSON

  XML

  TEXT

  对于1,我们暂不讨论. 对于JSON,XML,我们可以使用XPATH,对于TEXT,我们可以使用标准的常规或ETL进行提取.

  当我们定义要采集的URL时,我们需要同时配置要采集的索引和相应索引的XPATH路径或规则性. 当然,逻辑也可以由后端的ETL完成. 但是,现在我们已经将Spark Streaming用作采集系统,我们自然可以使用其强大的数据处理功能来完成必要的格式化操作. 因此,我们建议直接在采集系统中进行此操作.

  采集系

  数据源的可能数据结构:

   appName      采集的应用名称,cluster1,cluster2

 appType       采集的应用类型,storm/zookeeper/yarn 等

 url                需要采集的接口

 params         可能存在例如post请求之类的,需要额外的请求参数

 method         Get/POST/PUT 等请求方法体

 key_search_qps :  $.store.book[0].author   定义需要抽取的指标名称以及在Response 对应的XPATH 路径

 key_.....  可以有更多的XPATH

 key_.....  可以有更多的XPATH

 extraParams  人工填写一些其他参数

  获取系统通过我们打包的DInputStream获取这些数据,然后根据批处理(计划周期)获取这些数据,然后将其传递给特定的执行逻辑以进行执行. 使用StreamingPro,将像这样:

  "RestCatch": {    "desc": "RestCatch",    "strategy": "....SparkStreamingStrategy",    "algorithm": [],    "ref": [],    "compositor": [

      {        "name": "....ESInputCompositor",        "params": [

          {            "es.nodes": "....",            "es.resource": "monitor_rest/rest"

          }

        ]

      },

      {        "name": ".....RestFetchCompositor",//发起http请求,获取response

        "params": [

          {            "resultKey": "result",            "keyPrefix": "key_"

          }

        ]

      },

      {        "name": "....JSonExtractCompositor",//根据XPath获取response里的指标

        "params": [

          {            "resultKey": "result",            "keyPrefix": "key_"

          }

        ]

      },

      {        "name": ".....ConsoleOutputCompositor",//输出结果

        "params": []

      }

    ],    "configParams": {

    }

  }

  通过上述配置文件,您可以很好地看到处理流程.

  输入采集来源

  采集结果

  根据XPATH提取指标

  输出结果

  创建元数据管理系统

  元数据管理系统是必需的,它可以帮助您添加新的URL监视项目. 使用StreamingPro,您可以将元数据管理页面添加到Spark Streaming的驱动程序,以实现元数据操作逻辑. 将来,我们将提供更好的教程,介绍如何通过StreamingPro将自定义的Rest界面/网页添加到Spark Streaming.

  结束了吗?

  以上实际上是采集系统的原型. 得益于Spark Streaming的自然分布和灵活的运算符,我们的系统足够灵活,可以水平缩放.

  但是,您会发现

  如果每个接口需要不同的采集期限怎么办?

  如果我想获得更好的容错能力?

  如何实现更好的动态扩展?

  第一个问题很好地解决了. 我们在元数据中定义采集周期,并将Spark Streaming的调度周期设置为最小粒度.

  第二个问题是业务级别的容错能力,但是如果任务失败,Spark Streaming将尝试重新计划并重试. 我们建议您自己做.

  第三,只要打开动态资源分配,就可以根据情况扩展和利用资源.

  作者: William Zhu

  链接:

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线