基于Debezium的实时计算工具插件模式研究(组图)
优采云 发布时间: 2021-05-16 07:14基于Debezium的实时计算工具插件模式研究(组图)
一、前言
随着业务的发展,以前的离线批量计算方法由于时延长而无法满足需求。随着诸如flink之类的实时计算工具的出现,实时采集也已成为大数据工作中非常重要的一部分。响。
当前,企业的数据源大致分为两种:存储在各种关系数据库中的业务数据网站或由APP生成的用户行为日志数据
可以通过flume,kafka和其他工具采集实时实现日志数据,但是关系数据库的同步仍然基于批处理。
当关系数据库的表数据达到一定水平时,批处理同步会花费很长时间,增量同步无法解决实时性要求
mysql可以通过binlog进行实时同步,技术相对成熟,但是不能解决SQLserver,Oracle,postgresql等数据库的问题。
即使无法解决实时采集问题,即使存在诸如kafka之类的流数据分发订阅平台,诸如flink之类的实时计算平台以及诸如redis之类的高效读写数据库,也是如此。 ,则无法实现整体的实时链接。
幸运的是,国外有一个开源工具可以实现市场上各种常用数据库的数据更新日志的获取。是德比兹
插件模式
二、简介
Debezium是一组分布式服务,用于捕获数据库中的更改,以便您的应用程序可以查看这些更改并做出响应。 Debezium在更改事件流中的每个数据库表中记录所有行级更改。应用程序只需要读取这些流就可以按照更改事件发生的顺序查看更改事件。
Debezium有两种操作模式,一种是通过插件形式从kafka connect继承的,另一种是作为独立服务(正在孵化)运行的
服务器模式
今天我们将介绍插件模式。
三、部署
插件模式首先要求在集群上安装了zookeeper和kafka。 Kafka可以连接到上游数据库。在这里,我使用flink来消耗kafka中的日志并将其实时写入mysql
因此,您还需要部署flink集群和mysql数据库
以上所有选项均可用后,您可以开始部署debezium
1.下载安装包
#以mysql为例,下载debezium-connector-mysql-1.4.2.Final-plugin.tar.gz
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.4.2.Final/debezium-connector-mysql-1.4.2.Final-plugin.tar.gz
在kafka安装文件夹中创建一个连接器文件夹,然后将下载的debezium插件解压缩到连接器
2.创建主题
创建Kafka连接需要三个主题:连接偏移量,连接配置,连接状态
3.编写kafka connect配置文件
创建connect-distributed.properties并将其分发到所有节点
#kafka-connect配置文件
# kafka集群地址
bootstrap.servers=ip1:9092,ip2:9092,ip3:9092
# Connector集群的名称,同一集群内的Connector需要保持此group.id一致
group.id=connect-cluster
# 存储到kafka的数据格式
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# 内部转换器的格式,针对offsets、config和status,一般不需要修改
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# 用于保存offsets的topic,应该有多个partitions,并且拥有副本(replication)
# Kafka Connect会自动创建这个topic,但是你可以根据需要自行创建
offset.storage.topic=connect-offsets
offset.storage.replication.factor=2
offset.storage.partitions=3
# 保存connector和task的配置,应该只有1个partition,并且有多个副本
config.storage.topic=connect-configs
config.storage.replication.factor=2
# 用于保存状态,可以拥有多个partition和replication
status.storage.topic=connect-status
status.storage.replication.factor=2
status.storage.partitions=3
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# RESET主机名,默认为本机
#rest.host.name=
# REST端口号
rest.port=18083
# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=
# 保存connectors的路径
#plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/opt/cloudera/parcels/CDH/lib/kafka/connectors
4.启动kafka-connect
注意:必须执行所有节点
cd /opt/cloudera/parcels/CDH/lib/kafka
bin/connect-distributed.sh -daemon config/connect-distributed.properties
###jps 可看到 ConnectDistributed 进程
5.通过POST URL提交连接请求
多个表名用逗号分隔,格式为db.table,参数中指定的主题为元数据主题,真实主题名称由server_name.db_name.table_name组成
POST:http://ip:18083/connectors
Headers:Content-Type: application/json
Body:{
"name" : "debezium-mysql",
"config":{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "host",
"database.port": "3306",
"database.user": "username",
"database.password": "password",
"database.server.id" :"1739",
"database.server.name": "mysql",
"database.history.kafka.bootstrap.servers": "ip1:9092,ip2:9092,ip3:9092",
"database.history.kafka.topic": "mysql.test",
"database.whitelist": "test",
"table.whitelist":"test.test_table2",
"include.schema.changes" : "true" ,
"mode" : "incrementing",
"incrementing.column.name" : "id",
"database.history.skip.unparseable.ddl" : "true"
}
}
提交完成后,使用GET:18083 / connectors获取连接器信息
由于debezium没有建立主题的逻辑,因此Kafka需要打开自动生成主题的配置
检查kafka是否生成了相应的主题,即较高源表的内容,如果该主题中有相应的更改日志记录,则说明任务配置成功
有很多方法可以从Kafka消费数据。