完美:基于Binlog实时同步数仓,有哪些不为人知的坑?
优采云 发布时间: 2020-09-03 17:32基于Binlog实时同步数据仓库,未知坑是什么?
我最近看到了一篇文章文章,该文章主要介绍了基于Flink的MySQL Binlog数据采集的解决方案。在查看了实际的方法和特定的代码操作之后,我感到其中存在一些考虑不周,不足的情况。
作者以前有一些类似的采集工具实践摘要,但没有整体进行系统总结,因此我想知道是否可以编写个人摘要文章,总结Binlog 采集中的问题]以及相应的解决方案?
也许很多人对Binlog不够了解,他们会肤浅地思考:“它不是MySQL生产的,它具有固定的日志结构。将数据采集放入,然后使其成为数据登录。这有什么困难?”
实际上,它实际上是一个日志,但实际上,从场景分析到技术选择的Binlog 采集,整个内部都有很多未知坑,所以请不要小看它。
作者写了这篇文章,目的是展示在实际工作中Binlog数据采集开发过程的原理,注意事项和可能出现的问题,并且还会有一些作者的个人结论。供参考的数据采集中的原则是干货。
一、 Binlog实时采集摘要原则
首先,撇开技术框架的讨论,亲自总结Binlog日志采集的数据的主要原理:
分别解释这三个原理的具体含义。
原则一
在数据采集中,数据着陆通常使用时间分区进行着陆,因此我们需要确定固定的时间戳作为时间划分的基本时间序列。
在这种情况下,业务数据上的时间戳字段似乎是从实际开发中获取此时间戳的角度来看,还是在实际表中将存在此类时间戳的情况下,对于所有表来说都是不可能的完全满足。
举个反例:
表:业务时间戳(或事件时间)
表A:create_time,update_time
表B:create_time
表C:create_at
表D:无
在这种情况下,理论上,可以通过在设计表时限制RD和DBA的表结构的正则化来统一和限制时间戳和命名,但是在实际工作中,这种情况基本上是不可能的。完成,我相信许多读者都会遇到这种情况。
也许很多学习数据采集的学生会认为,我们可以要求他们制定标准吗?
我个人认为是有可能的,但是大数据采集的基础数据不能完全依赖于这种相互建立的标准。
三个原因:
因此,如果要使用唯一的固定时间序列,则必须将其与业务数据分开。我们想要的时间戳不受业务数据更改的影响。
原则二
在业务数据库中,肯定会有表结构更改。在大多数情况下,会添加列,但也有诸如列重命名和列删除之类的情况,并且字段更改的顺序是无法控制的。
此原理要描述的是,导入到数据仓库中的表必须适应数据库表的各种操作,以保持其可用性和列数据的正确性。
原则三
可以追溯此数据,包括两个方面:
第一个描述是,在采集 binlog 采集端,您可以再次按下位置采集 binlog。
第二个描述是,在消费二进制日志登陆结束时,可以使用重复的消费来重新登陆数据。
这是作者的个人总结,无论选择哪种技术进行组合构造,都必须具备这些原则。
二、实施计划和具体操作
技术架构:Debezium + Confluent + Kafka + OSS / S3 + Hive
基于原理一的解决方案
Debezium提供了“新记录状态提取”的配置选项,相当于提供了一个转换运算符来提取binlog中的元数据。
对于0. 10版本的配置,您可以提取Binlog元数据信息,例如表,版本,连接器,名称,ts_ms,db,server_id,文件,pos,行等。
其中ts_ms是binlog日志的生成时间,这是binlog元数据,可以将其应用于所有数据表,并且可以使用此固定时间戳完全实现我们的时间戳,而无需完全了解数据表的内部结构一个。
关于Debezium,不同版本之前的配置参数可能不同。如果读者需要练习,则需要在官方文档中确认相应版本的配置参数。
对于市场上更常用的其他框架(例如Canal),或者如果读者拥有自己的数据采集程序,建议提取所有binlog元数据。可以使用此过程和后续过程。
基于原理二的解决方案
对于Hive而言,当前的主流数据存储格式为Parquet,ORC,Json,Avro。
除了讨论数据存储的效率。
对于前两种数据格式,列存储,也就是说,这两种数据格式的数据读取将严格取决于数据表中数据存储的顺序。该数据格式不符合要求。数据列可以灵活地添加,删除和其他操作。
Avro格式是行存储,但是它需要依赖于模式注册服务。考虑到Hive的数据表读取完全取决于外部服务,因此风险太大。
最后,确定使用Json格式进行数据存储。尽管这种读取和存储效率不如其他格式高,但是它可以确保可以在Hive中读取业务数据中的任何更改。
Debezium组件采集 binlog的数据为json格式,符合预期的设计方案,可以解决由原则2引起的问题。
对于在市场上更常用的诸如Canal之类的其他框架,可以将其设置为Json数据格式进行传输,或者如果读者需要开发自己的data 采集程序,则同样适用。
基于原理三的解决方案
在采集 binlog 采集侧,您可以再次按位置采集 binlog。
官方Debezium 网站也提供了此解决方案的实现。官方的Debezium 网站中也提供了相应的解决方案。为了简要说明,需要使用Kafkacat工具。
对于采集的每个MySQL实例,在创建数据采集任务时,Confluent将相应地创建连接器采集元数据的主题(即采集程序),该主题将存储相应的时间戳,文件位置和位置。您可以通过修改此数据来重置采集 binlog的位置。
值得注意的是,此操作的时间节点也受到限制,这与MySQL Binlog日志保留期有关。因此,以这种方式回溯时,您需要确认MySQL日志仍然存在。
重新登陆数据以重复使用。
该计划是基于Kafka的,并且有很多计划可以在线重新设计Kafka的消费抵消消费网站,因此在此我不再赘述。
对于让读者自己实现它,足以确认所选的MQ支持此功能。
常见问题解答:#how_to_change_the_offsets_of_the_source_database
三、不同的业务场景
本节仅描述如何在作者的技术框架下实施以下操作。读者可以根据自己选择的技术组件探索不同的技术解决方案。
1、数据库子数据库子表情况
基于Debezium的体系结构,一个Source只能对应采集的一个MySQL实例。要在同一实例上拆分表,可以使用Debezium主题路由功能。
采集过滤二进制日志时,请根据常规匹配将与采集对应的表写入指定的主题。
对于子数据库,还需要在*敏*感*词*侧添加RegexRouter转换运算符,以在主题之间进行合并和写入操作。
2、数据增量采集和总量采集
对于采集组件,当前配置基于增量,因此无论您选择Debezium还是Canal,正常配置都可以。
但是有时在某些情况下需要采集的完整表,而作者也为全部数据采集给出了计划。
计划一:
Debezium本身具有这样的功能,您需要将snapshot.mode参数选择设置为when_needed,以便可以完成表的采集操作总量。
在正式文档中,此处的参数配置有更详细的描述。
快照:#快照
计划二:
同时使用sqoop并增加采集。
此方案适用于表数据已经很多但当前binlog数据频率不高的情况,请使用此方案。
值得注意的是,有两点:
3、离线重复数据删除条件
数据到达后,binlog原创数据通过json表映射出来,然后问题来了,我们如何找到最新的数据?
也许我们可以简单地认为仅使用提取的ts_ms然后进行反演是不够的?
在大多数情况下,这确实是可能的。但是,在实际开发中,作者发现这种情况不能满足所有情况,因为在binlog中,可能有两个与ts_ms和PK相同但确实不同的数据。
那我们如何同时解决两个数据呢?答案就在上面,我们只是建议提取所有binlog元数据。
选择*
从
(
SELECT *,
row_number()over(按t.id分区或按t.`__ts_ms` DESC,t.`__file` DESC,cast(t.`__pos` AS int)DESC)按order_by
FROM测试t
哪里dt ='{pt}'
AND hour ='{now_hour}'
)t1
其中t 1. order_by = 1
在此sql中解释row_number的条件:
通过这种条件组合得出的数据是最新的。
一些读者可能会问,如果删除了这部分数据该怎么办?这样检索到的数据不是错误的吗?
此Debezium也具有相应的操作,并且具有相应的配置选项供您选择如何处理删除行为的binlog数据。
作为所有人的参考,作者选择了rewrite的参数配置,因此在上面sql的最外层中,仅需要判断“ delete ='false'”是正确的数据。
Debezium:
四、体系结构摘要
在选择技术以及整体和详细结构时,作者始终坚持原则-过程尽可能简单而不是尽可能简单。数据链接越长,链接可能出现的问题就越多。对于以后的锁定问题以及操作和维护,难度将很高。
因此,作者在技术选择中也考虑了Flink + Kafka方法,但是基于当时的现状,作者没有选择这种技术选择,并且作者还将说明原因。
1)作者的Flink环境尚未针对开发,操作和维护进行平台化。
2)场景偏向于数据采集和传输而不是计算,并且Flink的优势不多。
3)如果您基于MySQL实例开发Flink程序并使用本机Flink Steaming进行api风格的程序开发,如果该程序由于某些表中的数据而挂起,则该实例中的数据将不会可用采集这种影响的范围太大。
4)如果您基于表制作Flink程序或以常规方式匹配某些表,尽管可以保证灵活性,但是90%的代码是多余的,将有许多任务。浪费资源。
5)最后一个问题是开发和维护的效率。如果仅编写本机Flink程序,则随后的累积开发将使该程序越来越重,并且逻辑可能变得越来越麻烦。
总而言之,我当时想到了Flink。如果Flink并非基于平台进行开发,运行和维护监控,则可以将其用作临时解决方案。但是,如果Flink在以后的开发过程中被缝制和修复,那么在人类的开发下会出现更多的问题,或者每个人都在这样的程序框架下构建轮子,并且它们构建的越多,速度就越慢。而且,后期的主要项目方向并未将Flink平台化提上议事日程,因此考虑到某些未来条件也是一种选择。
因此,当我最终决定选择技术时,我没有选择Flink。
五、结论
本文的作者更具理论性,也是该方案的技术思想的摘要。技术架构方案有很多种。我只是选择其中之一进行实施。我也希望您有其他交流的技术方案或理论。请纠正我。
作者丨李楠来源丨数据仓库和Python大数据(ID:dw_zzxx)dbaplus社区欢迎技术人员的贡献。贡献电子邮件: