我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql

环境说明:

flink 1.15.2

Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production

mysql 版本:5.7

windows11 IDEA 本地运行

先上官网使用说明和案例:Oracle CDC Connector — Flink CDC documentation

Oracle 开启 log archiving

(1).启用 log archiving
        a:以DBA用户连接数据库 
             sqlplus / as sysdba
        b:启用 log archiving (会重启数据库)
             alter system set db_recovery_file_dest_size = 10G;
             alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
             shutdown immediate;
             startup mount;
             alter database archivelog;
             alter database open;
        c:检查 log archiving 是否开启  -- Should now "Database log mode: Archive Mode"
             archive log list;

    (2).注:必须为捕获的表或数据库启用补充日志记录,以便数据更改能够捕获已更改数据库行的之前状态。下面演示了如何在表/数据库级别上配置它。
        为一个特定的表启用补充日志记录:修改表目录。客户添加补充日志数据(所有)列;
            ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

        —为数据库启用补充日志修改数据库添加补充日志数据;
            ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

    (3).创建具有权限的Oracle用户
        a:创建表空间
            sqlplus / as sysdba
              CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
              exit;
          b:创建用户并赋权  flinkuser  flinkpw 
            sqlplus / as sysdba
              CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
              GRANT CREATE SESSION TO flinkuser;
              GRANT SET CONTAINER TO flinkuser;
              GRANT SELECT ON V_$DATABASE to flinkuser;
              GRANT FLASHBACK ANY TABLE TO flinkuser;
              GRANT SELECT ANY TABLE TO flinkuser;
              GRANT SELECT_CATALOG_ROLE TO flinkuser;
              GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
              GRANT SELECT ANY TRANSACTION TO flinkuser;
              GRANT LOGMINING TO flinkuser;
            
              GRANT CREATE TABLE TO flinkuser;
              GRANT LOCK ANY TABLE TO flinkuser;
              GRANT ALTER ANY TABLE TO flinkuser;
              GRANT CREATE SEQUENCE TO flinkuser;
            
              GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
              GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
            
              GRANT SELECT ON V_$LOG TO flinkuser;
              GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
              GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
              GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
              GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
              GRANT SELECT ON V_$LOGFILE TO flinkuser;
              GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
              GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
              exit;

Oracle 建表,并配置补充日志

CREATE TABLE "USER_INFO" (    
ID NUMBER, 
USERNAME VARCHAR2(255), 
PASSWORD VARCHAR2(255), 
PRIMARY KEY (ID));

ALTER TABLE USER_INFO ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

Mysql 建表

CREATE TABLE user_new (
  id int(11) NOT NULL,
  username varchar(255) DEFAULT NULL,
  password varchar(255) DEFAULT NULL,
  PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

Maven依赖

         8        8        1.15.2                            org.apache.flink            flink-clients            ${flink.version}                            org.apache.flink            flink-streaming-java            ${flink.version}                            org.apache.flink            flink-runtime-web            ${flink.version}                            org.apache.flink            flink-table-planner_2.12            ${flink.version}                                        org.apache.flink            flink-connector-jdbc            ${flink.version}                                                    mysql            mysql-connector-java            8.0.29                                        org.projectlombok            lombok            1.18.22                                    com.ververica            flink-sql-connector-mysql-cdc            2.3.0                                        org.apache.flink            flink-connector-jdbc            1.15.2                                                    org.apache.flink            flink-connector-base            ${flink.version}                                                com.ververica            flink-sql-connector-oracle-cdc            2.3.0                                        org.apache.logging.log4j            log4j-slf4j-impl            2.12.1                            org.slf4j            slf4j-simple            1.7.15                            org.apache.logging.log4j            log4j-core            2.17.2                                    org.apache.logging.log4j            log4j-api            2.17.2                                    log4j            log4j            1.2.9            

demo如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class OracleCdcToMysql {    public static void main(String[] args) {        //1.获取stream的执行环境        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();        senv.setParallelism(1);        //2.创建表执行环境        StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);        String sourceTable = "CREATE TABLE oracle_cdc_source " +                "( ID INT, " +                "USERNAME STRING, " +                "PASSWORD STRING, " +                "PRIMARY KEY(ID) NOT ENFORCED) WITH (\n" +                "'connector' = 'oracle-cdc',\n" +                "'hostname' = '1.1.1.1',\n" +                "'port' = '1521',\n" +                "'username' = 'flinkcdcuser',\n" +                "'password' = 'flinkpw',\n" +                "'database-name' = 'LMDB',\n" +//select name from v$database;                "'schema-name' = 'TEST',\n" +//select SYS_CONTEXT('USERENV','CURRENT_SCHEMA') CURRENT_SCHEMA from dual;                "'debezium.snapshot.mode' = 'schema_only',\n" +                //snapshot.mode = initial 快照包括捕获表的结构和数据。指定此值将用捕获表中数据的完整表示填充主题。                //snapshot.mode = schema_only 快照只包含捕获表的结构。如果希望连接器仅捕获快照之后发生的更改的数据,请指定此值。                "'scan.incremental.snapshot.enabled' = 'true',\n" +                //scan.incremental.snapshot.enabled 增量快照是一种读取表快照的新机制。增量快照与旧的快照机制相比有很多优点,包括:                // (1)在快照读取期间源可以并行;(2)在快照读取期间源可以在块粒度上执行检查点;(3)在快照读取之前源不需要获取ROW SHARE MODE锁。                "'scan.incremental.snapshot.chunk.size' = '8096' ,\n" +                //表快照的块大小(行数),当读取表快照时,捕获的表被分割成多个块。                "'scan.snapshot.fetch.size' = '1024',\n" +                //读取表快照时每个轮询的最大读取大小。                "'connect.max-retries' = '3',\n" +                //连接器应该重试构建Oracle数据库服务器连接的最大重试次数。                "'connection.pool.size'= '20',\n" +                //连接池大小                "'debezium.log.mining.strategy' = 'online_catalog',\n" +                //online_catalog -使用数据库的当前数据字典来解析对象id,并且不向在线重做日志中写入任何额外的信息。                // 这使得LogMiner的挖掘速度大大提高,但代价是无法跟踪DDL的变化。如果捕获的表模式很少或从不更改,那么这是理想的选择。                "'debezium.log.mining.archive.destination.name' = 'log_archive_dest_1',\n" +                "'debezium.log.mining.continuous.mine'='true'," +                "  'table-name' = 'USER_INFO'\n" +                ")";        tEnv.executeSql(sourceTable);//        tEnv.executeSql("select * from oracle_cdc_source").print(); //加上打印后,虽然可以实时看到增删改查记录,但是这些后续操作并不会插入到目标表。如果不加这句打印,则程序无问题        String sinkTable = "CREATE TABLE mysql_cdc_sink (" +                "  ID INT,\n" +                "  USERNAME STRING,\n" +                "  PASSWORD STRING,\n" +                "PRIMARY KEY(ID) NOT ENFORCED\n" +                ") WITH (\n" +                "'connector' = 'jdbc',\n" +                "'driver' = 'com.mysql.cj.jdbc.Driver',\n" +                "'url' = 'jdbc:mysql://localhost:3306/test?rewriteBatchedStatements=true',\n" +                "'username' = 'flink_cdc_user',\n" +                "'password' = 'flink@cdc',\n"+                "  'table-name' = 'user_new',\n" +                "  'connection.max-retry-timeout' = '60s'\n" +                ")";        tEnv.executeSql(sinkTable);        tEnv.executeSql("insert into mysql_cdc_sink select ID,USERNAME,PASSWORD from oracle_cdc_source");    }}

本地运行控制台是不会输出什么提示的,不像mysql cdc 还可以看到一些查看binlog日志信息。你可以知道程序运行成功与否,Oracle的什么都不会输出。

下图是有打印的,但是只能打印,后续插表动作就失效了。如果不打印,那就是什么都没有。

 下图是mysqlCDC的,可以看到有连接,有读取binlog日志,并且还可以打印,后续插表也正常。

具体对应数据类型,还需查看官网,最下面有列出所有对应的数据类型。

具体可用参数,可查官网,也可查阿里介绍,毕竟这是阿里大大的。感觉阿里大大的参数类型更全,更多。具体如何使用,还需研究。MySQL_实时计算 Flink版-阿里云帮助中心

打包到集群运行--后续再补一篇吧,前面几篇都需要。单独补一篇。

来源地址:https://blog.csdn.net/qq_41875667/article/details/131390725

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

基于Flink CDC实时同步数据(MySQL到MySQL)

一、环境 jdk8Flink 1.16.1(部署在远程服务器:192.168.137.99)Flink CDC 2.3.0MySQL 8.0(安装在本地:192.168.3.31) (安装部署过程略) 二、准备 准备三个数据库:flink_
2023-08-16

编程热搜

目录