我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Flink CDC 实时mysql到mysql

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Flink CDC 实时mysql到mysql

CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。

mysqlcdc需要mysql开启binlog,找到my.cnf,在[mysqld]中加入如下信息

[mysqld]

server-id=1

log-bin=mysql-bin

binlog-format=row

重启数据库。

2.创建springboot项目,pom添加依赖


1.8
1.13.6
2.11
1.7.30




org.apache.flink
flink-table-planner-blink_2.11
1.13.6




org.apache.flink
flink-java
${flink.version}



org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}


mysql
mysql-connector-java
8.0.17


org.apache.flink
flink-table-api-java
${flink.version}




org.apache.flink
flink-table-api-java-bridge_${scala.binary.version}
${flink.version}




com.ververica
flink-connector-mysql-cdc
2.2.0



org.apache.flink
flink-connector-jdbc_2.12
1.13.1






org.apache.maven.plugins
maven-shade-plugin
3.1.0


package

shade




*:*

META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA








Flink cdc实现mysql到mysql代码

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkMysqlToMysql {

public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

// 注册源表和目标表
tEnv.executeSql("create table sourceTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表连接器一定得是mysql-cdc
"'connector' = 'mysql-cdc'," +
"'hostname' = 'localhost',\n" +
" 'port' = '3306',\n" +
" 'database-name' = 'quarant_db',\n" +
" 'table-name' = 'organization_info',\n" +
" 'username' = 'root',\n" +
" 'password' = 'admin'\n" +
")");
// Table result = tEnv.sqlQuery("SELECT id, name,card_num,phone,address FROM quarantine");
// tEnv.registerTable("sourceTable",result);
tEnv.executeSql("create table targetTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目标表连接器是jdbc
"'connector' = 'jdbc'," +
"'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +
" 'table-name' = 'organization_info',\n" +
" 'username' = 'root',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'password' = 'admin'\n" +
")");
// 执行CDC过程
String query = "INSERT INTO targetTable SELECT * FROM sourceTable";
tEnv.executeSql(query).print();
}
}

运行Main方法

Flink会同步源表数据到目标表,后续源表的增删改都会实时同步至目标表中。

3.将程序打包成flink jar

idea使用快捷键control+alt+shift+s,点击Artifacts->JAR

 选择Main class,点击ok

 然后选择上面菜单栏Build Artifacts

 点击build

 生成的jar在项目目录下面有个out目录

至此,flink jar程序就写好了,可以把jar丢到flink上运行了 

来源地址:https://blog.csdn.net/letterss/article/details/131128378

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Flink CDC 实时mysql到mysql

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

基于Flink CDC实时同步数据(MySQL到MySQL)

一、环境 jdk8Flink 1.16.1(部署在远程服务器:192.168.137.99)Flink CDC 2.3.0MySQL 8.0(安装在本地:192.168.3.31) (安装部署过程略) 二、准备 准备三个数据库:flink_
2023-08-16

MySQL Flink实时流处理的核心技术之窗口机制

Flink是一款流处理框架,窗口机制是其核心技术之一。Flink的窗口机制可以将无限的数据流划分为有限的窗口,并对窗口内的数据进行处理。Flink的窗口机制支持时间、计数、会话等多种窗口类型,并且可以在不同的窗口之间进行流转换和数据聚合,是实时流处理中非常重要的技术
2023-05-19

编程热搜

目录