我的编程空间,编程开发者的网络收藏夹
学习永远不晚

如何使用Flink CDC实现 Oracle数据库数据同步

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

如何使用Flink CDC实现 Oracle数据库数据同步

前言

Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式实现数据库同步,同时也提供了Flink CDC Source Connector API。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。
本文通过flink-connector-oracle-cdc来实现Oracle数据库的数据同步。

一、开启归档日志

1)数据库服务器终端,使用sysdba角色连接数据库

 sqlplus / as sysdba
或
sqlplus /nolog
CONNECT sys/password AS SYSDBA;

2)检查归档日志是否开启

archive log list;

(“Database log mode: No Archive Mode”,日志归档未开启)
(“Database log mode: Archive Mode”,日志归档已开启)
3)启用归档日志

alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;

注意:
启用归档日志需要重启数据库。
归档日志会占用大量的磁盘空间,应定期清除过期的日志文件
4)启动完成后重新执行 archive log list; 查看归档打开状态

二、创建flinkcdc专属用户

2.1 对于Oracle 非CDB数据库,执行如下sql

  CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
  GRANT CREATE SESSION TO flinkuser;
  GRANT SET CONTAINER TO flinkuser;
  GRANT SELECT ON V_$DATABASE to flinkuser;
  GRANT FLASHBACK ANY TABLE TO flinkuser;
  GRANT SELECT ANY TABLE TO flinkuser;
  GRANT SELECT_CATALOG_ROLE TO flinkuser;
  GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
  GRANT SELECT ANY TRANSACTION TO flinkuser;
  GRANT LOGMINING TO flinkuser;
  GRANT ANALYZE ANY TO flinkuser;
  GRANT CREATE TABLE TO flinkuser;
  -- need not to execute if set scan.incremental.snapshot.enabled=true(default)
  GRANT LOCK ANY TABLE TO flinkuser;
  GRANT ALTER ANY TABLE TO flinkuser;
  GRANT CREATE SEQUENCE TO flinkuser;
  GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
  GRANT SELECT ON V_$LOG TO flinkuser;
  GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
  GRANT SELECT ON V_$LOGFILE TO flinkuser;
  GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

2.2 对于Oracle CDB数据库,执行如下sql

  CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
  GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;
  GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;
  GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;
  GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;
  GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
  GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
  GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;
  GRANT LOGMINING TO flinkuser CONTAINER=ALL;
  GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;
  -- need not to execute if set scan.incremental.snapshot.enabled=true(default)
  GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;
  GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;
  GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;

三、指定oracle表、库级启用

-- 指定表启用补充日志记录:
ALTER TABLE databasename.tablename ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- 为数据库的所有表启用
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- 指定数据库启用补充日志记录
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

四、使用flink-connector-oracle-cdc实现数据库同步

4.1 引入pom依赖

 <dependency>
     <groupId>com.ververica</groupId>
     <artifactId>flink-connector-oracle-cdc</artifactId>
     <version>2.4.0</version>
 </dependency>

4.2 Java主代码

package test.datastream.cdc.oracle;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.types.Row;
import test.datastream.cdc.oracle.function.CacheDataAllWindowFunction;
import test.datastream.cdc.oracle.function.CdcString2RowMap;
import test.datastream.cdc.oracle.function.DbCdcSinkFunction;
import java.util.Properties;
public class OracleCdcExample {
    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        //数字类型数据 转换为字符
        properties.setProperty("decimal.handling.mode", "string");
        SourceFunction<String> sourceFunction = OracleSource.<String>builder()
//                .startupOptions(StartupOptions.latest()) // 从最晚位点启动
                .url("jdbc:oracle:thin:@localhost:1521:orcl")
                .port(1521)
                .database("ORCL") // monitor XE database
                .schemaList("C##flink_user") // monitor inventory schema
                .tableList("c##flink_user.TEST2") // monitor products table
                .username("c##flink_user")
                .password("flinkpw")
                .debeziumProperties(properties)
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.addSource(sourceFunction).setParallelism(1);// use parallelism 1 for sink to keep message ordering
        SingleOutputStreamOperator<Row> mapStream = source.flatMap(new CdcString2RowMap());
        SingleOutputStreamOperator<Row[]> winStream = mapStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .process(new CacheDataAllWindowFunction());
		//批量同步
        winStream.addSink(new DbCdcSinkFunction(null));
        env.execute();
    }
}

4.3json转换为row

package test.datastream.cdc.oracle.function;
import cn.com.victorysoft.common.configuration.VsConfiguration;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import test.datastream.cdc.CdcConstants;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class CdcString2RowMap extends RichFlatMapFunction<String, Row> {
    private Map<String,Integer> columnMap =new HashMap<>();
    @Override
    public void open(Configuration parameters) throws Exception {
        columnMap.put("ID",0);
        columnMap.put("NAME",1);
        columnMap.put("DESCRIPTION",2);
        columnMap.put("AGE",3);
        columnMap.put("CREATE_TIME",4);
        columnMap.put("SCORE",5);
        columnMap.put("C_1",6);
        columnMap.put("B_1",7);
    }
    @Override
    public void flatMap(String s, Collector<Row> collector) throws Exception {
        System.out.println("receive: "+s);
        VsConfiguration conf=VsConfiguration.from(s);
        String op = conf.getString(CdcConstants.K_OP);
        VsConfiguration before = conf.getConfiguration(CdcConstants.K_BEFORE);
        VsConfiguration after = conf.getConfiguration(CdcConstants.K_AFTER);
        Row row =null;
        if(CdcConstants.OP_C.equals(op)){
            //插入,使用after数据
            row = convertToRow(after);
            row.setKind(RowKind.INSERT);
        }else if(CdcConstants.OP_U.equals(op)){
            //更新,使用after数据
            row = convertToRow(after);
            row.setKind(RowKind.UPDATE_AFTER);
        }else if(CdcConstants.OP_D.equals(op)){
            //删除,使用before数据
            row = convertToRow(before);
            row.setKind(RowKind.DELETE);
        }else {
            //r 操作,使用after数据
            row = convertToRow(after);
            row.setKind(RowKind.INSERT);
        }
        collector.collect(row);
    }
    private Row convertToRow(VsConfiguration data){
        Set<String> keys = data.getKeys();
        int size = keys.size();
        Row row=new Row(8);
        int i=0;
        for (String key:keys) {
            Integer index = this.columnMap.get(key);
            Object value=data.get(key);
            if(key.equals("CREATE_TIME")){
                //long日期转timestamp
                value=long2Timestamp((Long)value);
            }
            row.setField(index,value);
        }
        return row;
    }
    private static  java.sql.Timestamp long2Timestamp(Long time){
        Timestamp timestamp = new Timestamp(time/1000);
        System.out.println(timestamp);
        return timestamp;
    }
}

到此这篇关于使用Flink CDC实现 Oracle数据库数据同步的文章就介绍到这了,更多相关Flink CDC Oracle数据同步内容请搜索编程网(www.lsjlt.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网(www.lsjlt.com)!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

如何使用Flink CDC实现 Oracle数据库数据同步

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

如何使用Flink CDC实现 Oracle数据库数据同步

目录前言一、开启归档日志二、创建flinkcdc专属用户2.1 对于oracle 非CDB数据库,执行如下sql2.2 对于Oracle CDB数据库,执行如下sql三、指定oracle表、库级启用四、使用flink-connector-o
如何使用Flink CDC实现 Oracle数据库数据同步
2024-08-21

基于Flink CDC实时同步数据(MySQL到MySQL)

一、环境 jdk8Flink 1.16.1(部署在远程服务器:192.168.137.99)Flink CDC 2.3.0MySQL 8.0(安装在本地:192.168.3.31) (安装部署过程略) 二、准备 准备三个数据库:flink_
2023-08-16

mysql数据库同步如何实现

MySQL数据库同步可以通过多种方式实现,以下是一些常用的方法:使用主从复制:主从复制是MySQL自带的一种数据库同步方法,通过将主数据库的变更日志传输到从数据库,从数据库可以实时同步主数据库的数据。这种方法适用于需要实时同步数据的场景。使
mysql数据库同步如何实现
2024-04-09

python如何实现不同数据库间数据同步功能

这篇文章主要为大家展示了python如何实现不同数据库间数据同步功能,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带大家一起来研究并学习一下“python如何实现不同数据库间数据同步功能”这篇文章吧。python是什么意思P
2023-06-06

如何使用MySQL进行跨数据库的数据同步?

如何使用MySQL进行跨数据库的数据同步?在现代的软件开发中,数据库的使用无处不在。而随着软件项目的增长,数据的同步和备份变得越来越重要。MySQL是一个强大的关系型数据库管理系统,同时也提供了一些可靠的方法来实现跨数据库的数据同步。本文将
2023-10-22

Oracle数据库ODBC连接与数据仓库的实时数据同步

要实现Oracle数据库ODBC连接与数据仓库的实时数据同步,可以使用以下步骤:首先,确保你已经安装了Oracle数据库ODBC驱动程序,并且已经配置好了ODBC数据源连接到Oracle数据库。确保你的数据仓库支持ODBC连接,并且有相应的
Oracle数据库ODBC连接与数据仓库的实时数据同步
2024-07-16

redis如何同步数据库

同步 redis 与数据库包括以下步骤:1. 选择同步机制(redis sentinel、redis cluster 或外部脚本);2. 设置 redis(启用 rdb 持久化、配置从库);3. 设置数据库(创建表、定义触发器);4. 配置
redis如何同步数据库
2024-06-12

阿里云数据库同步数据轻松实现跨平台数据同步

随着互联网技术的不断发展,数据的同步需求日益增长。尤其是在跨平台应用开发中,如何实现不同平台之间的数据同步,成为了开发者需要解决的重要问题。本文将介绍阿里云数据库同步数据的相关功能和使用方法,帮助您轻松实现跨平台数据同步。一、阿里云数据库同步数据阿里云数据库同步数据是一种能够实现跨平台数据同步的技术。通过使用阿里
阿里云数据库同步数据轻松实现跨平台数据同步
2023-11-04

如何使用Redis实现分布式数据同步

如何使用Redis实现分布式数据同步随着互联网技术的发展和应用场景的日益复杂,分布式系统的概念越来越被广泛采用。在分布式系统中,数据同步是一个重要的问题。Redis作为一个高性能的内存数据库,不仅可以用来存储数据,还可以用来实现分布式数据同
如何使用Redis实现分布式数据同步
2023-11-07

MySQL同步数据Replication如何实现

今天小编给大家分享一下MySQL同步数据Replication如何实现的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。MySQ
2023-07-05

Java实现同步枚举类数据到数据库

这篇文章主要为大家详细介绍了Java实现同步枚举类数据到数据库,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
2022-11-13

编程热搜

目录