携程数据开发总监纪成:大数据领域开源技术框架

优采云 发布时间: 2021-07-14 21:47

  携程数据开发总监纪成:大数据领域开源技术框架

  作者介绍

  程冀,携程数据开发总监,负责金融数据基础组件及平台开发、数据仓库建设及治理相关工作。对大数据领域的开源技术框架有浓厚的兴趣。

  一、Background

  携程金融成立于2017年9月,本着实*敏*感*词*融助力出行的使命,开始全面发展集团风控和金融服务。需要在携程数据中心建设统一的金融数据中心,实现多地点、多机房数据。整合满足线上线下需求;它涉及将数千个 mysql 表同步到离线数据仓库、实时数据仓库和在线缓存。由于对跨区域、实时性、准确性和完整性的高要求,无法支持集团内DataX(业内常见的离线同步解决方案)的二次开发。以mysql-hive同步为例,DataX通过直连MySQL批量拉取数据,存在以下问题:

  二、项目概览

  基于以上背景,我们设计了一个基于binlog实时流式传输的数据库层构建方案,并取得了预期的效果。结构如图,各模块介绍:

  

  三、详细介绍

  本章以mysql-hive镜像为例,详细介绍技术方案。

  3.1.binlog采集

  canal 是阿里巴巴开源的 Mysql binlog 增量订阅消费组件。它在工业中被广泛使用。通过实时增量采集binlog,可以减轻mysql的压力,减少数据变化细粒度恢复的过程。 ,我们选择canal作为binlog采集的基础组件,根据应用场景进行二次开发。其中,raw binlog→simple binlog的消息格式转换是重点。

  以下是binlog采集的架构图:

  

  canal在1.1.4版本引入了canal-admin项目,支持面向WebUI的管理能力;我们使用原生canal-admin管理binlog采集,采集粒度为mysql实例级别。

  Canal Server 会从 canalAdmin 中拉取集群下的所有 mysql 实例列表。对于每个mysql instance采集任务,canal server通过在zookeeper中创建临时节点,通过zookeeper共享binlog位置来实现HA。

  canal1.1.1版本引入MQProducer原生支持Kafka消息传递。图中,实例active从mysql获取实时增量原创binlog数据,在MQProducer链接中进行raw binlog→simple binlog消息转换并发送给kafka。我们根据实例创建了对应的kafka主题,而不是每个数据库一个主题,主要是考虑到同一个mysql实例下有多个数据库。过多的topic(分区)导致kafka的随机IO增加,影响吞吐量。发送Kafka时,使用schemaName+tableName作为partitionKey,结合生产者的参数控制,保证同表的binlog消息按顺序写入Kafka。

  参考生产者参数控制:

  

max.in.flight.requests.per.connection=1retries=0acks=all

  主题级别的配置:

  

topic partition 3副本, 且min.insync.replicas=2

  考虑到保证数据的顺序性和容灾性,我们设计了轻量级的SimpleBinlog消息格式:

  

  金融目前部署了4组canal集群,每组2个物理机节点,跨机房部署,已经承担了数百个mysql实例binlog采集任务。 Canal 服务器自带的性能监控是基于 Prometheus 的。通过实施PrometheusScraper,我们主动拉取核心指标,推送到集团内部Watcher监控系统,配置相关告警。每个mysql实例的binlog采集延迟是全链路监控。重要指标。

  系统前期遇到canal-server实例脑裂的问题。具体场景是活动实例所在的canal-server。由于网络问题,zookeeper的链接超时。这时候备实例会抢先创建一个临时节点,成为一个新节点。活跃;还有一种情况是两个actives同时采集和push binlog。解决方法是在active实例和zookeeper链接超时后立即自杀,再次发起下一轮抢占。

  3.2 历史数据回放

  有两种场景需要我们采集historical data:

  有两种选择:

  1) 从mysql批量拉取历史数据上传到HDFS。需要考虑批量拉取的数据和binlog采集生成的mysql-hive镜像的格式差异,比如去重主键的选择,排序字段的选择。

  2)Streaming模式,批量从mysql中拉取历史数据,转换成简单的binlog消息流写入kafka,用实时采集的简单binlog流复用后续处理过程。合并生成mysql-hive镜像表时,需要保证这部分数据不会覆盖实时采集简单binlog数据。

  我们选择了更简单、更易于维护的选项 2,并开发了一个 binlog-mock 服务。根据用户给出的库、表(前缀)和条件,我们可以批量选择(例如每次选择10000行)mysql查询数据,组装成simple_binlog消息发送给Kafka。

  对于mocks的历史数据,需要注意:

  3.3 Write2HDFS

  我们使用 spark-streaming 将 Kafka 消息持久化到 HDFS,每 5 分钟一批,在提交消费者偏移量之前处理一批数据(持久化到 HDFS),以确保消息至少被处理一次;还要考虑分库分表的问题,数据倾斜:

  阻塞分库分表:以order表为例,mysql数据存放在ordercenter_00 ... ordercenter_99 100个数据库中,每个数据库有orderinfo_00...orderinfo_99 100个表,以及数据库prefix schemaNamePrefix=ordercenter ,表前缀 tableNamePrefix=orderinfo 统一映射到 tableName=${schemaNamePrefix}_${tableNamePrefix};根据binlog executeTime字段生成对应的分区dt,保证同一天同一个数据库表的数据落入同一个分区目录:base_path/ods_binlog_source.db/${database_prefix}_${table_prefix}/dt ={binlogDt}/binlog-{timestamp}-{rdd.id}

  防止数据倾斜:数据倾斜的问题经常发生在系统的早期阶段。调查发现,业务在特定时间段内为单张表批量运行产生的binlog量特别大,并且一张表的数据和一批数据需要写入同一个HDFS文件,一个表的写入速度单个 HDFS 文件成为瓶颈。所以加了一个链接(步骤2),过滤掉当前batch中的“大表”,将这些大表的数据写入多个HDFS文件中。

  base_path/ods_binlog_source.db/${database_prefix}_${table_prefix}/dt={binlogDt}/binlog-{timestamp}-{rdd.id}-[${randomInt}]

  

  3.4 生成镜像

  3.4.1 数据就绪检查

  spark-streaming 作业每隔 5 分钟将 kafka simple_binlog 消息分批持久化到 HDFS,合并任务每天执行一次。每天 0:15,数据就绪检查开始。我们监控了消息的整个环节,包括binlog采集delay t1、kafka同步延迟t2、spark-streaming消费者延迟t3。假设当前时间是早上0:30,设置为t4。如果t4>(t1+t2+t3),则表示T-1天的数据全部落入HDFS,可以执行下游ETL操作(merge)。

  

  3.4.2 合并

  HDFS上的简单binlog数据准备好后,下一步就是恢复对应的MySQL业务表数据。下面是Merge的执行过程,步骤如下:

  1)加载T-1分区的简单binlog数据

  数据就绪检查通过后,通过MSCK REPAIR PARTITION加载T-1分区的simple_binlog数据。注:此表为原创简单binlog数据,具体mysql表的字段没有平铺。如果是第一次镜像mysql-hive,历史数据回放的简单binlog也会落入T-1分区。

  2)检查Schema并提取T-1增量

  请求镜像后端获取最新的mysql schema。如果有变化,更新mysql-hive镜像表(snap),让下游不知道;同时根据mysql schema字段列表和“hive主键”等配置信息,从上面的simple_binlog分区中提取mysql表的T-1天详细数据(delta)。

  3)判断业务库中是否发生了归档操作,判断后续合并时是否忽略DELETE事件。

  业务DELETE数据有两种情况:业务维修单等导致的正常DELETE,需要同步到Hive;业务数据库归档历史数据产生的DELETE,这种DELETE操作需要忽略。

  在系统上线初期,我们等待业务或DBA的通知,然后手动处理,很麻烦。很多情况下,通知不到位,导致Hive数据丢失历史数据。为了解决这个问题,在Merge之前会自动判断流程,参考规则如下:

  4)合并delta数据(delta)和当前快照(snap T-2)去重复,得到最新的snap T-1。

  

  下面用一个例子来说明合并过程。假设订单表有三个字段:id、order_no、amount,id是全局唯一的; snap表t3为mysql-hive镜像,合并过程如图。

  

  3.4.3 检查

  数据合并完成后,为了保证mysql-hive镜像表中数据的准确性,会将hive表和mysql表进行字段和数据量的比对,作为最后一道防线我们在配置mysql-hive镜像的时候,会指定一个检查条件,一般是根据createTime字段来比较7天的数据;镜像后台每天早上都会从mysql中预先计算过去7天的增量,离线任务会通过脚本(http)获取,以上数据用snap表验证。实践中遇到的一些问题:

  1)T-1的binlog落在T分区

  检查服务根据createTime生成查询条件,检查mysql和Hive数据。因为业务SQL中createTime和binlog executeTime不一致,凌晨前后1秒,会导致Hive漏掉这个数据。可以通过一起加载T-day分区的binlog数据重新合并。

  2) 业务表迁移,原表停止更新。虽然mysql和hive的数据量是一样的,但是已经不能满足要求了。这种情况可以通过波动找到。

  3.5 其他

  在实践中,可以根据需要在binlog采集及后续消息流中引入一些数据治理工作。例如:

  1)明文检测:binlog采集link对核心数据库表数据进行实时明文检测,可以防止敏感数据流入数据仓库;

  2)Standardization:一些领域的标准化操作,比如id映射,不同密文的映射;

  3)Metadata:mysql→hive镜像是数据仓库ODS的核心。根据采集配置信息,可以实现两者映射关系的双向检索,方便数据仓库溯源。这是财务元数据管理的重要组成部分。

  通过消费binlog将mysql镜像到实时数仓(kudu、es)和在线缓存(redis)的逻辑比较简单,篇幅有限,本文不再赘述。

  四、Summary 和展望

  基于binlog的金融数据基础层建设方案顺利完成预期目标:

<p>该解决方案已成为金融线上线下服务的基石,并不断拓展其使用场景。未来将在自动化配置(mirror-admin和canal-admin一体化,实现一键搭建)、智能运维(异常数据检查识别与恢复)、元数据管理等方面投入更多。

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线