Flink CDC 实时mysql到mysql
CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。
mysqlcdc需要mysql开启binlog,找到my.cnf,在[mysqld]
中加入如下信息
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=row
重启数据库。
2.创建springboot项目,pom添加依赖
1.8
1.13.6
2.11
1.7.30
org.apache.flink
flink-table-planner-blink_2.11
1.13.6
org.apache.flink
flink-java
${flink.version}
org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}
mysql
mysql-connector-java
8.0.17
org.apache.flink
flink-table-api-java
${flink.version}
org.apache.flink
flink-table-api-java-bridge_${scala.binary.version}
${flink.version}
com.ververica
flink-connector-mysql-cdc
2.2.0
org.apache.flink
flink-connector-jdbc_2.12
1.13.1
org.apache.maven.plugins
maven-shade-plugin
3.1.0
package
shade
*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
Flink cdc实现mysql到mysql代码
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkMysqlToMysql {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
// 注册源表和目标表
tEnv.executeSql("create table sourceTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表连接器一定得是mysql-cdc
"'connector' = 'mysql-cdc'," +
"'hostname' = 'localhost',\n" +
" 'port' = '3306',\n" +
" 'database-name' = 'quarant_db',\n" +
" 'table-name' = 'organization_info',\n" +
" 'username' = 'root',\n" +
" 'password' = 'admin'\n" +
")");
// Table result = tEnv.sqlQuery("SELECT id, name,card_num,phone,address FROM quarantine");
// tEnv.registerTable("sourceTable",result);
tEnv.executeSql("create table targetTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目标表连接器是jdbc
"'connector' = 'jdbc'," +
"'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +
" 'table-name' = 'organization_info',\n" +
" 'username' = 'root',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'password' = 'admin'\n" +
")");
// 执行CDC过程
String query = "INSERT INTO targetTable SELECT * FROM sourceTable";
tEnv.executeSql(query).print();
}
}
运行Main方法
Flink会同步源表数据到目标表,后续源表的增删改都会实时同步至目标表中。
3.将程序打包成flink jar
idea使用快捷键control+alt+shift+s,点击Artifacts->JAR
选择Main class,点击ok
然后选择上面菜单栏Build Artifacts
点击build
生成的jar在项目目录下面有个out目录
至此,flink jar程序就写好了,可以把jar丢到flink上运行了
来源地址:https://blog.csdn.net/letterss/article/details/131128378
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341