我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Flink CDC 基于mysql binlog 实时同步mysql表

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Flink CDC 基于mysql binlog 实时同步mysql表

环境说明:

flink 1.15.2

mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据

windows11 IDEA 本地运行

先上官网使用说明和案例:MySQL CDC Connector — Flink CDC documentation

mysql开启binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人测试是捕获不到binlog日志的,增量相当于没用,不知道是不是ndbcluster 下的binlog 配置是否有问题,但是同一集群下,InnoDB的表就可以捕获到binlog日志。听朋友说,ndbcluster 是内存型引擎,有可能不会实时写日志到磁盘,所以捕获不到.....)

# 判断MySQL是否已经开启binlog   on  为打开状态
SHOW VARIABLES LIKE 'log_bin';    

# 查看MySQL的binlog模式
show global variables like "binlog%";

# 查看日志开启状态 
show variables like 'log_%';

# 刷新log日志,立刻产生一个新编号的binlog日志文件,跟重启一个效果 
flush logs;

# 清空所有binlog日志 
reset master;

创建一个用户,赋权

CREATE USER 'flink_cdc_user'@'%' IDENTIFIED BY 'flink@cdc';
GRANT ALL PRIVILEGES ON *.* TO 'flink_cdc_user'@'%';

maven依赖:

         8        8        1.15.2                    org.apache.flink            flink-clients            ${flink.version}                            org.apache.flink            flink-streaming-java            ${flink.version}                            org.apache.flink            flink-runtime-web            ${flink.version}                            org.apache.flink            flink-table-planner_2.12            ${flink.version}                                        org.apache.flink            flink-connector-jdbc            ${flink.version}                                                    mysql            mysql-connector-java            8.0.29                                        org.projectlombok            lombok            1.18.22                                    com.ververica            flink-sql-connector-mysql-cdc            2.3.0                                        org.apache.flink            flink-connector-jdbc            1.15.2                                                    org.apache.flink            flink-connector-base            ${flink.version}                        

若是打包到集群运行,相关依赖要放开 provided,这样就不会把依赖打入到jar包里面,就不会和flink lib里面的jar包冲突。

lib 里面需要加入的包:从官网下载,放入即可

flink-connector-jdbc-1.15.4.jar

flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

flink-sql-connector-mysql-cdc-2.3.0.jar

mysql-connector-java-8.0.29.jar

commons-cli-1.5.0.jar

mysql建表如下:

#mysql建表:

CREATE TABLE `user` (
  `id` int(11) NOT NULL,
  `username` varchar(255) DEFAULT NULL,
  `password` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `user_sink` (
  `id` int(11) NOT NULL,
  `username` varchar(255) DEFAULT NULL,
  `password` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

测试demo如下:

package com.xgg.flink.stream.sql;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class MysqlToMysqlHavePrimaryKey {    public static void main(String[] args) {        //1.获取stream的执行环境        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();        senv.setParallelism(1);        //2.创建表执行环境        StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);        String sourceTable = "CREATE TABLE mysql_cdc_source (" +                "  id INT,\n" +                "  username STRING,\n" +                "  password STRING,\n" +                "PRIMARY KEY(id) NOT ENFORCED\n" +                ") WITH (\n" +                "'connector' = 'mysql-cdc',\n" +                "'hostname' = 'localhost',\n" +                "'port' = '3306',\n" +                "'username' = 'root',\n" +                "'password' = 'root',\n" +                "'database-name' = 'test_cdc',\n" +                "'debezium.snapshot.mode' = 'initial',\n" +                "'table-name' = 'user'\n" +                ")";        tEnv.executeSql(sourceTable);        String sinkTable = "CREATE TABLE mysql_cdc_sink (" +                "  id INT,\n" +                "  username STRING,\n" +                "  password STRING,\n" +                "PRIMARY KEY(id) NOT ENFORCED\n" +                ") WITH (\n" +                "'connector' = 'jdbc',\n" +                "'driver' = 'com.mysql.cj.jdbc.Driver',\n" +                "'url' = 'jdbc:mysql://localhost:3306/test_cdc?rewriteBatchedStatements=true',\n" +                "'username' = 'root',\n" +                "'password' = 'root',\n" +                "'table-name' = 'user_sink'\n" +                ")";        tEnv.executeSql(sinkTable);        tEnv.executeSql("insert into mysql_cdc_sink select id,username,password from mysql_cdc_source");        tEnv.executeSql("select * from mysql_cdc_source").print();    }}

源表进行操作,flink cdc 捕获操作记录进行打印,然后插入到表中。(mysql的cdc可以一边打印,一边写表,无问题。oracle的cdc,如果有多个执行操作,就会只执行一个,比如,先打印再写表,oracle只能打印,写表操作就不会触发。如果不打印,只写表,那就没问题。好像和senv.setParallelism(1);没关系,应该还是底层实现的问题。)

user 源表和目标表 user_sink,数据都如下。

 源表和目标表都是在Mysql有主键的,所以找个参数虽然是初始化操作,后面插入也是 insert into ,但是不管执行多少遍,都不会有重复的数据。

"'debezium.snapshot.mode' = 'initial',\n" +
?rewriteBatchedStatements=true 这个参数是开启批量写,能加大写速度。

来源地址:https://blog.csdn.net/qq_41875667/article/details/131382802

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Flink CDC 基于mysql binlog 实时同步mysql表

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

基于Flink CDC实时同步数据(MySQL到MySQL)

一、环境 jdk8Flink 1.16.1(部署在远程服务器:192.168.137.99)Flink CDC 2.3.0MySQL 8.0(安装在本地:192.168.3.31) (安装部署过程略) 二、准备 准备三个数据库:flink_
2023-08-16

基于OGG实现MySQL实时同步

?????? 哈喽!大家好,我是【IT邦德】,江湖人称jeames007,10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】!??? 中国DBA联盟(ACDU)成员,目前服务于工业互联网 擅长主流Ora
基于OGG实现MySQL实时同步
2023-12-22

基于Canal以及消息队列实现MySQL的Binlog近实时同步

基于Canal以及消息队列实现MySQL的Binlog近实时同步 1.canal的应用场景 目前普遍基于日志增量订阅和消费的业务,主要包括 基于数据库增量日志解析,提供增量数据订阅和消费数据库镜像数据库实时备份索引构建和实时维护(拆分异构索
2023-08-21

编程热搜

目录