文章实时采集(就是数据更新日志的获取,就是debezium插件模式详解(组图))
优采云 发布时间: 2021-11-23 17:23文章实时采集(就是数据更新日志的获取,就是debezium插件模式详解(组图))
一、前言
随着业务的发展,之前的离线批量计算方式,延迟时间长,无法满足需求。随着flink等实时计算工具的出现,实时采集也成为大数据工作中非常重要的一部分。.
如今企业的数据来源大致分为两类:存储在各种关系数据库中的业务数据,网站或APP生成的用户行为日志数据
日志数据已经可以通过flume、kafka等工具实现实时,但是关系型数据库的同步还是基于批处理。
当关系型数据库的表数据达到一定程度时,批量同步时间过长,增量同步无法解决实时性要求
mysql可以通过binlog进行实时同步,技术比较成熟,但是不能解决SQLserver、Oracle、postgresql等数据库的问题。
即使有kafka这样的流数据分发订阅平台,flink这样的实时计算平台,redis这样的高效读写数据库,如果实时采集问题无法解决,那么就无法实现完整的实时链接。
好在国外有一个开源工具,可以实现对市面上各种常用数据库的数据更新日志的获取。它是 debezium。
插件模式
二、简介
Debezium 是一组分布式服务,用于捕获数据库中的更改,以便您的应用程序可以查看这些更改并对其做出响应。Debezium 在更改事件流中记录每个数据库表中的所有行级更改。应用程序只需要读取这些流就可以按照更改事件发生的顺序查看更改事件。
Debezium有两种运行方式,一种在kafka connect中作为插件继承,另一种作为独立服务运行(孵化)
服务器模式
今天我们要介绍的是插件模式。
三、部署
插件模式首先要求集群上已经安装了zookeeper和kafka。Kafka 可以连接到上游数据库。这里我使用flink消费kafka中的日志,并实时写入mysql。
所以还需要部署flink集群和mysql数据库
以上都具备后,就可以开始部署debezium了
1.下载安装包
#以mysql为例,下载debezium-connector-mysql-1.4.2.Final-plugin.tar.gz
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.4.2.Final/debezium-connector-mysql-1.4.2.Final-plugin.tar.gz
在kafka安装文件夹中创建connectors文件夹,将下载的debezium插件解压到connectors
2.创建话题
创建 kafka connect 需要的三个主题:connect-offsets、connect-configs、connect-status
3.编写kafka连接配置文件
创建 connect-distributed.properties 并分发到所有节点
#kafka-connect配置文件
# kafka集群地址
bootstrap.servers=ip1:9092,ip2:9092,ip3:9092
# Connector集群的名称,同一集群内的Connector需要保持此group.id一致
group.id=connect-cluster
# 存储到kafka的数据格式
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# 内部转换器的格式,针对offsets、config和status,一般不需要修改
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# 用于保存offsets的topic,应该有多个partitions,并且拥有副本(replication)
# Kafka Connect会自动创建这个topic,但是你可以根据需要自行创建
offset.storage.topic=connect-offsets
offset.storage.replication.factor=2
offset.storage.partitions=3
# 保存connector和task的配置,应该只有1个partition,并且有多个副本
config.storage.topic=connect-configs
config.storage.replication.factor=2
# 用于保存状态,可以拥有多个partition和replication
status.storage.topic=connect-status
status.storage.replication.factor=2
status.storage.partitions=3
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# RESET主机名,默认为本机
#rest.host.name=
# REST端口号
rest.port=18083
# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=
# 保存connectors的路径
#plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/opt/cloudera/parcels/CDH/lib/kafka/connectors
4.启动kafka-connect
注意:必须执行所有节点
cd /opt/cloudera/parcels/CDH/lib/kafka
bin/connect-distributed.sh -daemon config/connect-distributed.properties
###jps 可看到 ConnectDistributed 进程
5.通过POST URL提交连接请求
多个表名用逗号分隔,格式为db.table,参数中指定的topic为元数据topic,真正的topic名由server_name.db_name.table_name组成
POST:http://ip:18083/connectors
Headers:Content-Type: application/json
Body:{
"name" : "debezium-mysql",
"config":{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "host",
"database.port": "3306",
"database.user": "username",
"database.password": "password",
"database.server.id" :"1739",
"database.server.name": "mysql",
"database.history.kafka.bootstrap.servers": "ip1:9092,ip2:9092,ip3:9092",
"database.history.kafka.topic": "mysql.test",
"database.whitelist": "test",
"table.whitelist":"test.test_table2",
"include.schema.changes" : "true" ,
"mode" : "incrementing",
"incrementing.column.name" : "id",
"database.history.skip.unparseable.ddl" : "true"
}
}
提交完成后,使用GET:18083/connectors获取连接器信息
由于debezium没有构建topic的逻辑,所以Kafka需要开启自动生成topic的配置
检查kafka是否生成了对应的topic,上位源表的内容,如果topic中有对应的change log记录,则任务配置成功
有很多方法可以消费来自 Kafka 的数据。