如何使用Flink CDC实现 Oracle数据库数据同步
前言
Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式实现数据库同步,同时也提供了Flink CDC Source Connector API。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。
本文通过flink-connector-oracle-cdc来实现Oracle数据库的数据同步。
一、开启归档日志
1)数据库服务器终端,使用sysdba角色连接数据库
sqlplus / as sysdba
或
sqlplus /nolog
CONNECT sys/password AS SYSDBA;
2)检查归档日志是否开启
archive log list;
(“Database log mode: No Archive Mode”,日志归档未开启)
(“Database log mode: Archive Mode”,日志归档已开启)
3)启用归档日志
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
注意:
启用归档日志需要重启数据库。
归档日志会占用大量的磁盘空间,应定期清除过期的日志文件
4)启动完成后重新执行 archive log list; 查看归档打开状态
二、创建flinkcdc专属用户
2.1 对于Oracle 非CDB数据库,执行如下sql
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;
GRANT ANALYZE ANY TO flinkuser;
GRANT CREATE TABLE TO flinkuser;
-- need not to execute if set scan.incremental.snapshot.enabled=true(default)
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
2.2 对于Oracle CDB数据库,执行如下sql
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;
GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;
GRANT LOGMINING TO flinkuser CONTAINER=ALL;
GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;
-- need not to execute if set scan.incremental.snapshot.enabled=true(default)
GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;
三、指定oracle表、库级启用
-- 指定表启用补充日志记录:
ALTER TABLE databasename.tablename ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- 为数据库的所有表启用
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- 指定数据库启用补充日志记录
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
四、使用flink-connector-oracle-cdc实现数据库同步
4.1 引入pom依赖
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>2.4.0</version>
</dependency>
4.2 Java主代码
package test.datastream.cdc.oracle;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.types.Row;
import test.datastream.cdc.oracle.function.CacheDataAllWindowFunction;
import test.datastream.cdc.oracle.function.CdcString2RowMap;
import test.datastream.cdc.oracle.function.DbCdcSinkFunction;
import java.util.Properties;
public class OracleCdcExample {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
//数字类型数据 转换为字符
properties.setProperty("decimal.handling.mode", "string");
SourceFunction<String> sourceFunction = OracleSource.<String>builder()
// .startupOptions(StartupOptions.latest()) // 从最晚位点启动
.url("jdbc:oracle:thin:@localhost:1521:orcl")
.port(1521)
.database("ORCL") // monitor XE database
.schemaList("C##flink_user") // monitor inventory schema
.tableList("c##flink_user.TEST2") // monitor products table
.username("c##flink_user")
.password("flinkpw")
.debeziumProperties(properties)
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.addSource(sourceFunction).setParallelism(1);// use parallelism 1 for sink to keep message ordering
SingleOutputStreamOperator<Row> mapStream = source.flatMap(new CdcString2RowMap());
SingleOutputStreamOperator<Row[]> winStream = mapStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new CacheDataAllWindowFunction());
//批量同步
winStream.addSink(new DbCdcSinkFunction(null));
env.execute();
}
}
4.3json转换为row
package test.datastream.cdc.oracle.function;
import cn.com.victorysoft.common.configuration.VsConfiguration;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import test.datastream.cdc.CdcConstants;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class CdcString2RowMap extends RichFlatMapFunction<String, Row> {
private Map<String,Integer> columnMap =new HashMap<>();
@Override
public void open(Configuration parameters) throws Exception {
columnMap.put("ID",0);
columnMap.put("NAME",1);
columnMap.put("DESCRIPTION",2);
columnMap.put("AGE",3);
columnMap.put("CREATE_TIME",4);
columnMap.put("SCORE",5);
columnMap.put("C_1",6);
columnMap.put("B_1",7);
}
@Override
public void flatMap(String s, Collector<Row> collector) throws Exception {
System.out.println("receive: "+s);
VsConfiguration conf=VsConfiguration.from(s);
String op = conf.getString(CdcConstants.K_OP);
VsConfiguration before = conf.getConfiguration(CdcConstants.K_BEFORE);
VsConfiguration after = conf.getConfiguration(CdcConstants.K_AFTER);
Row row =null;
if(CdcConstants.OP_C.equals(op)){
//插入,使用after数据
row = convertToRow(after);
row.setKind(RowKind.INSERT);
}else if(CdcConstants.OP_U.equals(op)){
//更新,使用after数据
row = convertToRow(after);
row.setKind(RowKind.UPDATE_AFTER);
}else if(CdcConstants.OP_D.equals(op)){
//删除,使用before数据
row = convertToRow(before);
row.setKind(RowKind.DELETE);
}else {
//r 操作,使用after数据
row = convertToRow(after);
row.setKind(RowKind.INSERT);
}
collector.collect(row);
}
private Row convertToRow(VsConfiguration data){
Set<String> keys = data.getKeys();
int size = keys.size();
Row row=new Row(8);
int i=0;
for (String key:keys) {
Integer index = this.columnMap.get(key);
Object value=data.get(key);
if(key.equals("CREATE_TIME")){
//long日期转timestamp
value=long2Timestamp((Long)value);
}
row.setField(index,value);
}
return row;
}
private static java.sql.Timestamp long2Timestamp(Long time){
Timestamp timestamp = new Timestamp(time/1000);
System.out.println(timestamp);
return timestamp;
}
}
到此这篇关于使用Flink CDC实现 Oracle数据库数据同步的文章就介绍到这了,更多相关Flink CDC Oracle数据同步内容请搜索编程网(www.lsjlt.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网(www.lsjlt.com)!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341