我的编程空间,编程开发者的网络收藏夹
学习永远不晚

最新版Flink CDC MySQL同步MySQL(一)

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

最新版Flink CDC MySQL同步MySQL(一)

1.概述

Flink CDC 是Apache Flink ®的一组源连接器,使用变更数据捕获 (CDC) 从不同数据库中获取变更。Apache Flink 的 CDC Connectors集成 Debezium 作为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。
在这里插入图片描述

2.支持的连接器

连接器数据库驱动
mongodb-cdcMongoDB: 3.6, 4.x, 5.0MongoDB Driver: 4.3.4
mysql-cdcMySQL: 5.6, 5.7, 8.0.x、RDS MySQL: 5.6, 5.7, 8.0.x、PolarDB MySQL: 5.6, 5.7, 8.0.x、Aurora MySQL: 5.6, 5.7, 8.0.x、MariaDB: 10.x、PolarDB X: 2.0.1JDBC Driver: 8.0.28
oceanbase-cdcOceanBase CE: 3.1.x, 4.x、OceanBase EE: 2.x, 3.x, 4.xOceanBase Driver: 2.4.x
oracle-cdcOracle: 11, 12, 19, 21Oracle Driver: 19.3.0.0
postgres-cdcPostgreSQL: 9.6, 10, 11, 12, 13, 14JDBC Driver: 42.5.1
sqlserver-cdcSqlserver: 2012, 2014, 2016, 2017, 2019JDBC Driver: 9.4.1.jre8
tidb-cdcTiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0JDBC Driver: 8.0.27
db2-cdcDb2: 11.5Db2 Driver: 11.5.0.0
vitess-cdcVitess: 8.0.x, 9.0.xMySql JDBC Driver: 8.0.26

3.支持的 Flink 版本

下表显示了 Flink CDC Connectors 与 Flink ®的版本对应关系:

Flink CDC版本_Flink 版本_
1.0.01.11.*
1.1.01.11.*
1.2.01.12.*
1.3.01.12.*
1.4.01.13.*
2.0.*1.13.*
2.1.*1.13.*
2.2.*1.13.*、1.14.*
2.3.*1.13.*、1.14.*、1.15.*、1.16.0
2.4.*1.13.*、1.14*、1.15.*、1.16.*、1.17.0

4.特征

支持读取数据库快照,即使出现故障也能继续读取binlog,并进行Exactly-once处理。

DataStream API 的 CDC 连接器,用户可以在单个作业中使用多个数据库和表的更改,而无需部署 Debezium 和 Kafka。

Table/SQL API 的 CDC 连接器,用户可以使用 SQL DDL 创建 CDC 源来监视单个表上的更改。

5.表/SQL API 的用法

我们需要几个步骤来使用提供的连接器设置 Flink 集群。

首先我们安装了 1.17+ 版本的 Flink 集群(java 8+)。

注意: 如果需要安装Flink请查看笔者对应的博客 flink高可用集群搭建(Standalone模式)
本文用到的jar包flink-connector-jdbc-3.1.1-1.17.jar和flink-sql-connector-mysql-cdc-2.2.1.jar

下载 连接器 SQL jar (或自行构建)。

将下载的jar包放在FLINK_HOME/lib/.

重启Flink集群。

注意:目前2.4以上版本需要进行自行编译构建。本文笔者自行进行构建上传的

6.使用 Flink CDC 对 MySQL 进行流式 ETL

本教程将展示如何使用 Flink CDC 快速构建 MySQL的流式 ETL。

假设我们将产品数据存储在MySQL中,同步到另外一个MySQL中

在下面的章节中,我们将介绍如何使用 Flink Mysql CDC 来实现它。本教程中的所有练习均在 Flink SQL CLI 中进行,整个过程使用标准 SQL 语法,无需任何 Java/Scala 代码,也无需安装 IDE。

架构概述如下:
在这里插入图片描述

7.环境准备

需要准备安装好的MySQL数据库,具体MySQL数据怎么安装请查看笔者的博客Ubuntu数据库安装(mysql)

注意: 如果是其他操作系统请查看其他博客对应的数据库安装教程

8.在 Flink SQL CLI 中使用 Flink DDL 创建表

使用以下命令启动 Flink SQL CLI:

./bin/sql-client.sh

我们应该看到 CLI 客户端的欢迎屏幕。
在这里插入图片描述首先,每 3 秒启用一次检查点

-- Flink SQL                   Flink SQL> SET execution.checkpointing.interval = 3s;

编辑源数据库Flink Sql代码,如下所示:

CREATE TABLE products ( id INT NOT NULL, name STRING, description STRING, PRIMARY KEY(id) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', #引入的CDC jar包驱动,没有引入会报错提示需要引入 'hostname' = '192.168.50.163',#源数据库连接host地址,可以根据自己的具体设置,此处为笔者本机的 'port' = '3306', #源数据库端口 'username' = 'root',#源数据库账号 'password' = '*****',#源数据库密码 'database-name' = 'mydb',#源数据库 'table-name' = 'products'#源数据库表);

在Flink SQL 执行以下语句创建从相应数据库表捕获更改数据的表

-- Flink SQLFlink SQL> CREATE TABLE products (    id INT,    name STRING,    description STRING,    PRIMARY KEY (id) NOT ENFORCED  ) WITH (    'connector' = 'mysql-cdc',    'hostname' = '192.168.50.163',    'port' = '3306',    'username' = 'root',    'password' = '****',    'database-name' = 'mydb',    'table-name' = 'products'  );

编辑目标数据库Flink Sql代码,如下所示:

CREATE TABLE product (    id INT,    name STRING,    description STRING,    PRIMARY KEY (id) NOT ENFORCED  ) WITH (    #引入的jdbc jar包驱动,没有引入会报错提示需要引入 flink-connector-jdbc    'connector' = 'jdbc',    #目标数据库连接url地址,可以根据自己的具体设置,此处为笔者本机的。部分高版本的MySQL需要添加useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC    'url' = 'jdbc:mysql://192.168.50.163:3306/mydb1?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',    #需要访问的数据库驱动    'driver' = 'com.mysql.cj.jdbc.Driver',    #目标数据库账号    'username' = 'root',    #目标据库密码    'password' = '***',    #目标数据库表    'table-name' = 'product'  );

在Flink SQL 执行以下语句创建捕获更改数据的表与目标数据库表的映射关系

-- Flink SQLFlink SQL> CREATE TABLE product (    id INT,    name STRING,    description STRING,    PRIMARY KEY (id) NOT ENFORCED  ) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://192.168.50.163:3306/mydb1?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',    'driver' = 'com.mysql.cj.jdbc.Driver',    'username' = 'root',    'password' = 'root',    'table-name' = 'product'  );

9.将源数据表加载到目标MySQL

使用Flink SQL将表product与 表查询products表写入目标MySQL。

-- Flink SQLFlink SQL> insert into product select * from products;

具体操作步骤如下所示:
在这里插入图片描述

这是源数据库,操作添加数据,如下图所示:
在这里插入图片描述
目标数据库同步操作如下图
在这里插入图片描述

10.flink可视化界面查看Running JOBS

红框勾选为运行的同步任务
在这里插入图片描述
至此Flink CDC MySQL同步MySQL第一节讲解完毕,后面会更新其复杂操作

来源地址:https://blog.csdn.net/weixin_43114209/article/details/131553658

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

最新版Flink CDC MySQL同步MySQL(一)

下载Word文档到电脑,方便收藏和打印~

下载Word文档

编程热搜

目录