我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Flink CDC 最佳实践(以 MySQL 为例)

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Flink CDC 最佳实践(以 MySQL 为例)

1. 准备工作

1.1 确认 MySQL binlog 模式

确认 MySQL 数据库的 binlog 模式是否为 ROW。可以在 MySQL 命令行中执行以下语句确认:

SHOW GLOBAL VARIABLES LIKE 'binlog_format';

如果返回结果中的 Value 字段为 ROW,则说明 binlog 模式为 ROW

1.2 下载并安装 Flink

下载并安装 Flink,可以参考官方文档进行安装。

2. 配置 Flink CDC

2.1 配置 MySQL 数据库连接信息

在 Flink 的配置文件 flink-conf.yaml 中添加 MySQL 数据库连接信息,例如:

# MySQL connection configurationmysql.server-id: 12345mysql.hostname: localhostmysql.port: 3306mysql.username: rootmysql.password: 123456mysql.database-name: test

2.2 配置 CDC Job

在 Flink 的 CDC Job 配置文件 mysql-cdc.properties 中添加以下配置:

# Flink CDC Job Configurationname: mysql-cdc-jobflink.parallelism: 1flink.checkpoint.interval: 60000flink.checkpoint.mode: EXACTLY_ONCE# MySQL CDC Source Configurationdebezium.transforms: unwrapdebezium.transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordStatedatabase.hostname: localhostdatabase.port: 3306database.user: rootdatabase.password: 123456database.history.kafka.bootstrap.servers: localhost:9092database.history.kafka.topic: mysql-cdc-historydatabase.server.id: 12345database.server.name: testdatabase.whitelist: test.user

其中,name 为 CDC Job 的名称,flink.parallelism 为 Flink 的并行度,flink.checkpoint.interval 为 Flink 的 Checkpoint 时间间隔,flink.checkpoint.mode 为 Checkpoint 模式,此处设置为 EXACTLY_ONCE

debezium.transforms 为 Debezium 转换器的名称,此处设置为 unwrapdatabase.hostnamedatabase.portdatabase.userdatabase.password 分别为 MySQL 数据库的连接信息。database.history.kafka.bootstrap.servers 为 Kafka 的地址信息,database.history.kafka.topic 为 CDC 历史数据记录的 Kafka Topic。database.server.id 为 MySQL 的 Server ID,database.server.name 为 CDC Source 的名称,database.whitelist 为需要进行同步的 MySQL 表的名称。

步骤一:创建 MySQL 数据库

首先,需要在本地或云端创建 MySQL 数据库,并添加一个具有读写权限的用户。下面是一个创建名为 test_db 的数据库以及名为 flink_cdc_user 的用户的示例 SQL 代码:

CREATE DATABASE test_db;CREATE USER 'flink_cdc_user'@'%' IDENTIFIED BY 'password';GRANT ALL PRIVILEGES ON test_db.* TO 'flink_cdc_user'@'%';

步骤二:启动 Flink 集群

启动一个 Flink 集群以便运行 CDC 应用程序。可以使用 Flink 自带的 bin/start-cluster.sh 脚本启动 Flink 集群。确保 Flink 集群在运行时已经包含了 Kafka 和 MySQL 的依赖项。

步骤三:创建 MySQL 表和 CDC 表

在 MySQL 中,首先需要创建需要进行 CDC 的表和 CDC 表。CDC 表是一个系统表,它存储了需要捕获的更改数据。可以通过以下代码创建一个名为 test_table 的表以及与之关联的 CDC 表

CREATE TABLE test_db.test_table (  id INT PRIMARY KEY,  name VARCHAR(30),  age INT,  email VARCHAR(50)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;CREATE TABLE test_db.test_table_cdc (  `database` VARCHAR(100),  `table` VARCHAR(100),  `type` VARCHAR(10),  `ts` TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),  `before` JSON,  `after` JSON) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

步骤四:编写 Flink CDC 应用程序

接下来,需要编写一个 Flink CDC 应用程序,以将 MySQL 表更改推送到 Kafka 主题中。可以使用 Flink 的 flink-connector-jdbc 库和 flink-connector-kafka 库来实现此目的。

以下是一个基本的 Flink CDC 应用程序的代码示例:

public static void main(String[] args) throws Exception {    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    env.setParallelism(1);    Properties properties = new Properties();    properties.setProperty("bootstrap.servers", "localhost:9092");    properties.setProperty("group.id", "test-group");    JdbcSource source = JdbcSource.builder()            .setDrivername("com.mysql.jdbc.Driver")            .setDBUrl("jdbc:mysql://localhost:3306/test_db")            .setUsername("flink_cdc_user")            .setPassword("password")            .setQuery("SELECT id, name, age, email FROM test_table")            .setRowTypeInfo(Types.ROW(Types.INT, Types.STRING, Types.INT, Types.STRING))            .setFetchSize(1000)            .build();    DataStream stream = env.addSource(source);

以下是一个简单的示例运行及结果:

$ bin/flink run -c com.example.MyCDCJob ./my-cdc-job.jar --database.server=mysql.example.com --database.port=3306 --database.name=mydb --database.username=myuser --database.password=mypassword --table.name=mytable --debezium.plugin.name=mysql --debezium.plugin.property.version=1.3.1.Final
[INFO] Starting CDC process for table: mytable.[INFO] Initializing CDC source...[INFO] CDC source successfully initialized.[INFO] Starting CDC source...[INFO] CDC source successfully started.[INFO] Adding CDC source to Flink job topology...[INFO] CDC source successfully added to Flink job topology.[INFO] Starting Flink job...[INFO] Flink job started successfully.[INFO] Change data for table: mytable.[INFO] Record key: {"id": 1}, record value: {"id": 1, "name": "Alice", "age": 25}.[INFO] Record key: {"id": 2}, record value: {"id": 2, "name": "Bob", "age": 30}.[INFO] Record key: {"id": 3}, record value: {"id": 3, "name": "Charlie", "age": 35}.[INFO] Change data for table: mytable.[INFO] Record key: {"id": 1}, record value: {"id": 1, "name": "Alice", "age": 27}.

可以看到,当有数据变更时,Flink CDC Job 会输出变更的表名、记录的主键以及变更的数据。例如,在这个示例中,有一行记录的年龄字段从25变成了27。

来源地址:https://blog.csdn.net/lhyandlwl/article/details/129998737

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Flink CDC 最佳实践(以 MySQL 为例)

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

应用开发实践之关系型数据库(以MySql为例)小结

多年开发实践中遇到的DB相关的话题研究和整理,不介绍DB的基本概念,也不过于深入DB原理,以满足日常应用、知其然知其所以然为准。包含十几个子话题,含事务传播性、索引优化、拆分、FailOver等。 本文主要是对目前工作中使用到的DB相关知识点的总
应用开发实践之关系型数据库(以MySql为例)小结
2018-01-10

编程热搜

目录