完整的采集神器(袋鼠云研发手记:第五期和实时采集袋鼠云云引擎团队)

优采云 发布时间: 2021-10-07 13:23

  完整的采集神器(袋鼠云研发手记:第五期和实时采集袋鼠云云引擎团队)

  袋鼠云研发笔记

  作为一家创新驱动的科技公司,袋鼠云每年研发投入数千万,公司员工80%为技术人员,()、()等产品不断迭代。在产品研发的过程中,技术兄弟可以文武兼备,在不断提升产品性能和体验的同时,也记录了这些改进和优化的过程,现记录在“袋鼠云研发笔记”栏目,以跟上行业的步伐。童鞋分享交流。

  袋鼠云堆栈引擎团队

  袋鼠云数据栈引擎团队拥有一批专家级、经验丰富的后端开发工程师,支持公司大数据栈产品线不同子项目的开发需求。FlinkX(基于Flink的数据同步)从项目中提取并开源。)、Jlogstash(java版logstash的实现)、FlinkStreamSQL(扩展原生FlinkSQL,实现流维表的join)多个项目。

  在长期的项目实践和产品迭代过程中,团队成员在Hadoop技术栈上不断探索和探索,积累了丰富的经验和最佳实践。

  第五期

  FlinkX中断点续传与实时传输详解采集

  袋鼠云是原生的一站式数据中心PaaS-数据栈,涵盖了数据中心建设过程中所需的各种工具(包括数据开发平台、数据资产平台、数据科学平台、数据服务引擎等),完整覆盖范围 离线计算和实时计算应用帮助企业大大缩短数据价值的提取过程,提高数据价值的提取能力。

  

  数据栈架构图 目前,数据栈-离线开发平台(BatchWorks)中的数据离线同步任务和数据栈-实时开发平台(StreamWorks)中的数据实时同步任务已经基于FlinkX进行了统一。离线数据采集和实时采集的基本原理是一样的。主要区别在于源流是否有界,所以使用 Flink 的 Stream API 来同步这两种数据。场景,实现数据同步的批量流程统一。

  1

  特征

  http

  可续传是指数据同步任务在运行过程中由于各种原因而失败。无需重新同步数据。您只需要从上次失败的位置继续同步。类似于网络原因下载文件失败。无需再次下载文件,只需继续下载,可大大节省时间和计算资源。可续传是数据栈-离线开发平台(BatchWorks)中数据同步任务的一个功能,需要结合任务的错误重试机制来完成。当任务失败时,它会在引擎中重试。重试时,会从上次失败时读取的位置继续读取数据,直到任务运行成功。

  

  实时采集

  实时采集是数据栈-实时开发平台(StreamWorks)中数据采集任务的一个功能。当数据源中的数据被添加、删除或修改时,同步任务会监控这些变化,并将变化的数据实时同步到目标数据源。除了实时数据变化,实时采集和离线数据同步的另一个区别是:实时采集任务不会停止,任务会一直监控数据源是否发生变化。这点与Flink任务是一致的,所以实时采集任务是数字栈流计算应用中的一种任务类型,配置过程与离线计算中的同步任务基本相同。

  

  2

  Flink 中的检查点机制

  可续传和实时采集都依赖于Flink的Checkpoint机制,所以先简单介绍一下。Checkpoint 是 Fl​​ink 容错机制的核心功能。它可以根据配置,根据Stream中各个Operator的状态,周期性的生成Snapshots,从而将这些状态数据定期持久化存储。当 Flink 程序意外崩溃时,它会重新运行 程序可以有选择地从这些 Snapshot 中恢复,从而纠正因故障导致的程序数据状态中断。

  

  当Checkpoint被触发时,一个barrier标签被插入到多个分布式的Stream Source中,这些barrier会随着Stream中的数据记录流向下游的算子。当运营商收到屏障时,它将暂停处理 Steam 中新收到的数据记录。因为一个Operator可能有多个输入Streams,每个Stream中都会有一个对应的barrier,所以Operator必须等待输入Stream中的所有barrier都到达。当流中的所有障碍都到达操作员时,所有障碍似乎都在同一时刻(表明它们已对齐)。在等待所有barrier到达的时候,operator的缓冲区可能已经缓存了一些比Barrier更早到达Operator的数据记录(Outgoing Records)。此时,Operator 将发出(Emit)数据记录(Outgoing Records)作为下游 Operator 的输入。最后,Barrier 将对应 Snapshot (Emit) 发送出去作为第二个 Checkpoint 的结果数据。

  3

  http

  先决条件

  同步任务必须支持断点续传,对数据源有一些强制要求:

  1、 数据源(这里特指关系型数据库)必须收录升序字段,例如主键或日期类型字段。检查点机制将在同步过程中用于记录该字段的值。恢复任务时将使用此字段。构造查询条件过滤同步数据。如果这个字段的值不是升序,那么在任务恢复时过滤的数据就会出错,最终会导致数据丢失或重复;

  2、 数据源必须支持数据过滤。否则,任务无法从断点处恢复,会造成数据重复;

  3、 目标数据源必须支持事务,比如关系数据库,临时文件也可以支持文件类型的数据源。

  任务操作详细流程

  我们用一个具体的任务来详细介绍整个过程。任务详情如下:

  数据源

  mysql表,假设表名为data_test,该表收录主键字段id

  目标数据源

  hdfs 文件系统,假设写入路径为 /data_test

  并发

  2

  检查点配置

  时间间隔为60s,checkpoint的StateBackend为FsStateBackend,路径为/flinkx/checkpoint

  作业编号

  用于构造数据文件的名称,假定为 abc123

  1) 读取数据 读取数据时,首先要构造数据片段。构造数据分片就是根据通道索引和检查点记录的位置构造查询sql。sql模板如下:

  select * from data_test where id mod ${channel_num}=${channel_index}and id > ${offset}

  如果是第一次运行,或者上一个任务失败的时候checkpoint没有触发,那么offset不存在,具体查询sql:offset存在时的第一个channel可以根据offset和channel确定:

  select * from data_testwhere id mod 2=0and id > ${offset_0};

  第二个频道:

  select * from data_testwhere id mod 2=1and id > ${offset_1};

  不存在偏移时的第一个通道:

  select * from data_testwhere id mod 2=0;

  第二个频道:

  select * from data_testwhere id mod 2=1;

  数据分片构建完成后,每个通道根据自己的数据分片来读取数据。2) 写数据前写数据:检查/data_test目录是否存在,如果目录不存在,则创建此目录,如果目录存在,执行2次操作;判断是否以覆盖方式写入数据,如果是,删除/data_test目录,然后创建目录,如果不是,执行3次操作;检查/data_test/.data目录是否存在,如果存在则先将其删除,然后再创建,确保没有其他任务因异常失败留下脏数据文件;写入hdfs的数据是单片写入的,不支持批量写入。数据会先写入/data_test/.data/目录,数据文件的命名格式为:channelIndex.jobId.fileIndex 收录三个部分:通道索引、jobId 和文件索引。3) 当checkpoint被触发时,FlinkX中的“status”代表的是标识字段id的值。我们假设触发检查点时两个通道的读写情况如图:

  检查点触发后,两个阅读器首先生成一个Snapshot记录阅读状态,通道0的状态为id=12,通道1的状态为id=11。快照生成后,会在数据流中插入一个barrier,这个barrier会和数据一起流向Writer。以 Writer_0 为例。Writer_0 接收 Reader_0 和 Reader_1 发送的数据。假设先收到了Reader_0的barrier,那么Writer_0就停止向HDFS写入数据,先把收到的数据放入InputBuffer,等待Reader_1的barrier到达。然后写出Buffer中的所有数据,然后生成Writer的Snapshot。整个checkpoint结束后,记录的任务状态为:Reader_0:id=12Reader_1:id=11Writer_0:id=无法确定Writer_1:id=无法确定任务状态会记录在配置的HDFS目录/flinkx/checkpoint/abc123中。因为每个Writer接收两个Reader的数据,每个通道的数据读写速率可能不同,所以Writer接收数据的顺序是不确定的,但这不影响数据的准确性,因为数据是read 这个时候只能用Reader记录的状态来构造查询sql,我们只需要确保数据真的写入HDFS即可。

  Writer 在生成快照之前,会做一系列的操作来保证所有接收到的数据都写入到 HDFS: a.关闭写入 HDFS 文件的数据流,此时会在 / data_test/.data 目录:/data_test/.data/0.abc123.0/data_test/.data/1.abc123.0b。将生成的两个数据文件移动到/data_test目录下;C、更新文件名模板为:channelIndex.abc123.1; 快照生成后,任务继续读写数据。如果在生成快照的过程中出现异常,任务会直接失败,所以不会生成这个快照,任务恢复时会从上次成功的快照恢复任务。4) 任务正常结束。

  select * from data_testwhere id mod 2=0and id > 12;

  第二个频道:

  select * from data_testwhere id mod 2=1and id > 11;

  这样就可以从上次失败的位置继续读取数据。

  支持可续传的插件

  理论上,只要支持过滤数据的数据源和支持事务的数据源能够支持续传功能,FlinkX目前支持的插件如下:

  读者

  作家

  mysql等关系数据读取插件

  HDFS、FTP、mysql等关系型数据库编写插件

  4

  实时采集

  目前 FlinkX 支持实时采集 插件,包括 KafKa 和 binlog 插件。binlog插件是专门为实时采集 mysql数据库设计的。如果要支持其他数据源,只需要将数据输入到Kafka,然后使用FlinkX的Kafka插件来消费数据。比如oracle,你只需要使用oracle的ogg将数据发送到Kafka即可。这里专门讲解mysql的实时采集插件binlog。

  二进制日志

  binlog 是由 Mysql 服务器层维护的二进制日志。它与innodb引擎中的redo/undo log是完全不同的日志;它主要用于记录更新或潜在更新mysql数据的SQL语句,并使用“事务”。表格保存在磁盘上。binlog的主要功能有:

  Replication:MySQL Replication在Master端打开binlog,Master将自己的binlog传递给slave并重放,达到主从数据一致性的目的;

  数据恢复:通过mysqlbinlog工具恢复数据;

  增量备份。

  MySQL 主备复制

  有记录数据变化的binlog日志是不够的。我们还需要用到MySQL的主从复制功能:主从复制是指一台服务器作为主数据库服务器,另一台或多台服务器作为从数据库服务器。数据自动复制到从服务器。

  

  主从复制的过程:MySQL主将数据变化写入二进制日志(二进制日志,这里的记录称为二进制日志事件,可以通过show binlog events查看);MySQL slave 将 master 的二进制日志事件复制到它的 Relay 日志;MySQL slave 重放中继日志中的事件,并将数据更改反映到自己的数据中。

  写入 Hive

  binlog插件可以监控多张表的数据变化。解析的数据收录表名信息。读取的数据可以写入目标数据库中的某个表,也可以根据数据中收录的表名信息进行写入。对于不同的表,目前只有 Hive 插件支持该功能。Hive插件目前只有一个写插件,功能是基于HDFS写插件实现的,也就是说从binlog读到hive写也支持故障恢复功能。

  

  写入Hive的过程:从数据中解析出MySQL表名,然后根据表名映射规则转换成对应的Hive表名;检查Hive表是否存在,如果不存在,则创建Hive表;查询Hive表的相关信息,构造HdfsOutputFormat;调用 HdfsOutputFormat 将数据写入 HDFS。

  

  欢迎了解袋鼠云数栈

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线