我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Oracle同步数据到kafka的方法

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Oracle同步数据到kafka的方法

环境准备

软件准备

  • CentOS Linux 7.6.1810 (2台,A主机,B主机)
  • Oracle 11.2.0.4(A主机安装)
  • Kafka 2.13-2.6.0 (B主机安装)
  • kafka-connect-oracle-master (B主机安装,开源程序,用于同步Oracle数据到kafka)
  • apache-maven 3.6.3 (B主机安装,kafka-connect-oracle-master 的打包工具)
  • jdk-8u261-linux-x64.rpm (B主机安装)

下载地址

  • Cenos 和Oracle 和 JDK 自行到官网下载
  • Kafka http://kafka.apache.org/downloads

  • Kafka Connect Oracle    https://github.com/tianxiancode/kafka-connect-oracle
  • apache-maven 3.6.3 http://maven.apache.org/download.cgi

实施过程

Oracle主机(A)配置

Oracle实例配置项:

  • 开启归档日志
  • 开启附加日志
  • 创建kafka-connect-oracle-master连接用户
  • 创建测试数据生成用户及测试表
--开启归档日志
sqlplus / as sysdba    
SQL>shutdown immediate
SQL>startup mount
SQL>alter database archivelog;
SQL>alter database open;
--开启附加日志
SQL>alter database add supplemental log data (all) columns;
--创建kafka-connect-oracle-master连接用户
create role logmnr_role;
grant create session to logmnr_role;
grant  execute_catalog_role,select any transaction ,select any dictionary to logmnr_role;
create user kminer identified by kminerpass;
grant  logmnr_role to kminer;
alter user kminer quota unlimited on users;
--创建测试数据生成用户及测试表
create tablespace test_date datafile '/u01/app/oracle/oradata/zzclass="lazy" data-src/test_date01.dbf' size 100M autoextend on next 10M;
create user whtest identified by whtest default tablespace test_date;
grant connect,resource to whtest;
grant execute on dbms_scheduler to whtest;
grant execute on dbms_random to whtest;
grant   create  job  to  whtest;
create table t1 (
id int ,
name char(10),
createtime date default sysdate
);
alter table WHTEST.T1  add constraint PK_ID_T1 primary key (ID)  using index   tablespace TEST_DATE;

create table t2 (
id int ,
name char(10),
createtime date default sysdate
);
alter table WHTEST.T2  add constraint PK_ID_T2 primary key (ID)  using index   tablespace TEST_DATE;
create table t3 (
id int ,
name char(10),
createtime date default sysdate
);
alter table WHTEST.T3  add constraint PK_ID_T3 primary key (ID)  using index   tablespace TEST_DATE;
begin
dbms_scheduler.create_job(
job_name=> 't1_job',
job_type=> 'PLSQL_BLOCK',
job_action =>'declare
v_id int;
v_name char(10);
begin
  for i in 1..10 loop
    v_id := round(dbms_random.value(1,1000000000));
    v_name :=round(dbms_random.value(1,1000000000));
    insert into whtest.t1 (id,name)values(v_id,v_name);
  end loop;
end;',
enabled=>true,
repeat_interval=>'sysdate + 5/86400',
comments=>'insert into t1 every 5 sec');
end;
/ 

begin
dbms_scheduler.create_job(
job_name=> 't2_job',
job_type=> 'PLSQL_BLOCK',
job_action =>'declare
v_id int;
v_name char(10);
begin
  for i in 1..10 loop
    v_id := round(dbms_random.value(1,1000000000));
    v_name :=round(dbms_random.value(1,1000000000));
    insert into whtest.t2 (id,name)values(v_id,v_name);
  end loop;
end;',
enabled=>true,
repeat_interval=>'sysdate + 5/86400',
comments=>'insert into t1 every 5 sec');
end;
/ 

begin
dbms_scheduler.create_job(
job_name=> 't3_job',
job_type=> 'PLSQL_BLOCK',
job_action =>'declare
v_id int;
v_name char(10);
begin
  for i in 1..10 loop
    v_id := round(dbms_random.value(1,1000000000));
    v_name :=round(dbms_random.value(1,1000000000));
    insert into whtest.t3 (id,name)values(v_id,v_name);
  end loop;
end;',
enabled=>true,
repeat_interval=>'sysdate + 5/86400',
comments=>'insert into t3 every 5 sec');
end;
/ 
--JOB创建之后,暂时先diable,待kafka配置完成之后再enable
exec DBMS_SCHEDULER.DISABLE('T1_JOB');
exec DBMS_SCHEDULER.DISABLE('T2_JOB');
exec DBMS_SCHEDULER.DISABLE('T3_JOB');

exec DBMS_SCHEDULER.ENABLE('T1_JOB');
exec DBMS_SCHEDULER.ENABLE('T2_JOB');
exec DBMS_SCHEDULER.ENABLE('T3_JOB');

Kafka主机(B)配置

​ 将下载好的Kafka 2.13-2.6.0 、kafka-connect-oracle-master、apache-maven 3.6.3、JDK 1.8.0上传至B主机/soft目录待使用。

主机hosts文件添加解析

[root@softdelvily ~]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.20.44 softdelvily localhost

安装JDK

[root@softdelvily soft]# rpm -ivh jdk-8u261-linux-x64.rpm 
warning: jdk-8u261-linux-x64.rpm: Header V3 RSA/SHA256 Signature, key ID ec551f03: NOKEY
Preparing...                          ################################# [100%]
Updating / installing...
   1:jdk1.8-2000:1.8.0_261-fcs        ################################# [100%]
Unpacking JAR files...
        tools.jar...
        plugin.jar...
        javaws.jar...
        deploy.jar...
        rt.jar...
        jsse.jar...
        charsets.jar...
        localedata.jar...

配置apache-maven工具

​ 将apache-maven-3.6.3-bin.tar.gz解压至/usr/local目录,并设置相应的/etc/profile环境变量。

[root@softdelvily soft]# tar xvf apache-maven-3.6.3-bin.tar.gz -C /usr/local/
apache-maven-3.6.3/README.txt
apache-maven-3.6.3/LICENSE
.....
[root@softdelvily soft]# cd /usr/local/
[root@softdelvily local]# ll
total 0
drwxr-xr-x. 6 root root  99 Sep 23 09:56 apache-maven-3.6.3
drwxr-xr-x. 2 root root   6 Apr 11  2018 bin
.....
[root@softdelvily local]# vi /etc/profile
.......
##添加如下环境变量
MAVEN_HOME=/usr/local/apache-maven-3.6.3
export MAVEN_HOME
export PATH=${PATH}:${MAVEN_HOME}/bin
[root@softdelvily local]# source /etc/profile
[root@softdelvily local]# mvn -v
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /usr/local/apache-maven-3.6.3
Java version: 1.8.0_262, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-0.el7_8.x86_64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-957.el7.x86_64", arch: "amd64", family: "unix"

配置Kafka 2.13-2.6.0

​ 解压Kafka 2.13-2.6.0 至/usr/local目录。

[root@softdelvily soft]# tar xvf kafka_2.13-2.6.0.tgz -C /usr/local/
kafka_2.13-2.6.0/
kafka_2.13-2.6.0/LICENSE
......
[root@softdelvily soft]# cd /usr/local/
[root@softdelvily local]# ll
total 0
drwxr-xr-x. 6 root root 99 Sep 23 09:56 apache-maven-3.6.3
drwxr-xr-x. 6 root root 89 Jul 29 02:23 kafka_2.13-2.6.0
.....

​ 开启kafka,并创建对应同步数据库过的topic

--1、session1 启动ZK
[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
[root@softdelvily bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties
[2020-09-23 10:06:49,158] INFO Reading configuration from: ../config/zookeeper.properties 
.......
[2020-09-23 10:06:49,311] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)
--2、session2 启动kafka
[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
[root@softdelvily bin]# ./kafka-server-start.sh ../config/server.properties
--3、session3 创建cdczztar
[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
[root@softdelvily bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cdczztar
Created topic cdczztar.
[root@softdelvily bin]# ./kafka-topics.sh --zookeeper localhost:2181 --list
__consumer_offsets
cdczztar

配置kafka-connect-oracle-maste

解压kafka-connect-oracle-master至/soft目录,并配置相应config文件,然后使用maven工具编译程序。

--解压zip包
[root@softdelvily soft]# unzip kafka-connect-oracle-master.zip 
[root@softdelvily soft]# ll
total 201180
-rw-r--r--. 1 root root   9506321 Sep 22 16:05 apache-maven-3.6.3-bin.tar.gz
-rw-r--r--. 1 root root 127431820 Sep  8 10:43 jdk-8u261-linux-x64.rpm
-rw-r--r--. 1 root root  65537909 Sep 22 15:59 kafka_2.13-2.6.0.tgz
drwxr-xr-x. 5 root root       107 Sep  8 15:48 kafka-connect-oracle-master
-rw-r--r--. 1 root root   3522729 Sep 22 14:14 kafka-connect-oracle-master.zip
[root@softdelvily soft]# cd kafka-connect-oracle-master/config/
[root@softdelvily config]# ll
total 4
-rw-r--r--. 1 root root 1135 Sep  8 15:48 OracleSourceConnector.properties
--调整properties配置文件
--需要调整项db.name.alias、topic、db.name、db.hostname、db.user、db.user.password、table.whitelist、table.blacklist信息,具体说明参考README.md
[root@softdelvily config]# vi OracleSourceConnector.properties 
name=oracle-logminer-connector
connector.class=com.ecer.kafka.connect.oracle.OracleSourceConnector
db.name.alias=zztar
tasks.max=1
topic=cdczztar
db.name=zztar
db.hostname=192.168.xx.xx
db.port=1521
db.user=kminer
db.user.password=kminerpass
db.fetch.size=1
table.whitelist=WHTEST.T1,WHTEST.T2
table.blacklist=WHTEST.T3
parse.dml.data=true
reset.offset=true
start.scn=
multitenant=false
--编译程序
[root@softdelvily ~]# cd /soft/kafka-connect-oracle-master
[root@softdelvily kafka-connect-oracle-master]# mvn clean package
[INFO] Scanning for projects...
.......
[INFO] Building jar: /soft/kafka-connect-oracle-master/target/kafka-connect-oracle-1.0.68.jar
with assembly file: /soft/kafka-connect-oracle-master/target/kafka-connect-oracle-1.0.68.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  94.03 s
[INFO] Finished at: 2020-09-23T10:25:52+08:00
[INFO] ------------------------------------------------------------------------

​ 将如下文件复制到kafka工作目录。

  • 复制kafka-connect-oracle-1.058.jar 和 lib/ojdbc7.jar 到$KAFKA_HOME/lib
  • 复制config/OracleSourceConnector.properties 文件到$KAFKA_HOME/config
[root@softdelvily config]# cd /soft/kafka-connect-oracle-master/target/
[root@softdelvily target]# cp kafka-connect-oracle-1.0.68.jar /usr/local/kafka_2.13-2.6.0/libs/
[root@softdelvily lib]# cd /soft/kafka-connect-oracle-master/lib
[root@softdelvily lib]# cp ojdbc7.jar /usr/local/kafka_2.13-2.6.0/libs/
[root@softdelvily lib]# cd /soft/kafka-connect-oracle-master/config/
[root@softdelvily config]# cp OracleSourceConnector.properties /usr/local/kafka_2.13-2.6.0/config/

启动kafka-connect-oracle

[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
[root@softdelvily bin]# ./connect-standalone.sh ../config/connect-standalone.properties ../config/OracleSourceConnector.properties
......
(com.ecer.kafka.connect.oracle.OracleSourceTask:187)
[2020-09-23 10:40:31,375] INFO Log Miner will start at new position SCN : 2847346 with fetch size : 1 (com.ecer.kafka.connect.oracle.OracleSourceTask:188)

启动kafka消费者

[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
[root@softdelvily bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic cdczztar

启动数据库JOB

[oracle@oracle01 ~]$ sqlplus / as sysdba

SQL*Plus: Release 11.2.0.4.0 Production on Wed Sep 23 10:45:16 2020

Copyright (c) 1982, 2011, Oracle.  All rights reserved.

set pagesize 999

Connected to:
Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production
With the Partitioning, OLAP, Data Mining and Real Application Testing options

SQL> SQL> conn whtest/whtest
Connected.
SQL> exec DBMS_SCHEDULER.ENABLE('T1_JOB');

PL/SQL procedure successfully completed.

kafka消费者界面

出现类似记录,表明同步成功,数据以key:value的形式输出。

{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"SCN"},{"type":"string","optional":false,"field":"SEG_OWNER"},{"type":"string","optional":false,"field":"TABLE_NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"TIMESTAMP"},{"type":"string","optional":false,"field":"SQL_REDO"},{"type":"string","optional":false,"field":"OPERATION"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"NAME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATETIME"}],"optional":true,"name":"value","field":"data"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"NAME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATETIME"}],"optional":true,"name":"value","field":"before"}],"optional":false,"name":"zztar.whtest.t1.row"},"payload":{"SCN":2847668,"SEG_OWNER":"WHTEST","TABLE_NAME":"T1","TIMESTAMP":1600829206000,"SQL_REDO":"insert into \"WHTEST\".\"T1\"(\"ID\",\"NAME\",\"CREATETIME\") values (557005146,'533888119 ',TIMESTAMP ' 2020-09-23 10:46:46')","OPERATION":"INSERT","data":{"ID":5.57005146E8,"NAME":"533888119","CREATETIME":1600829206000},"before":null}}

到此这篇关于Oracle同步数据到kafka的文章就介绍到这了,更多相关Oracle同步数据到kafka内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Oracle同步数据到kafka的方法

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

详解Flink同步Kafka数据到ClickHouse分布式表

目录引言什么是ClickHouse?创建复制表通过jdbc写入引言业务需要一种OLAP引擎,可以做到实时写入存储和查询计算功能,提供高效、稳健的实时数据服务,最终决定ClickHouse什么是ClickHouse?ClickHouse是
2022-12-01

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表 package flink;import org.apache.flink.streaming.api.environment.StreamExecutionEn
2023-08-30

oracle主备数据同步的方法是什么

Oracle主备数据同步的方法通常有以下几种:1. 归档日志传送:主数据库将归档日志传送给备份数据库,备份数据库按照日志的顺序进行应用,实现数据同步。2. 实时重做日志传送:主数据库将实时重做日志传送给备份数据库,备份数据库实时应用重做日志
2023-09-25

ODBC连接Oracle实现数据同步的方法

首先,确保你已经在Oracle数据库中创建了一个ODBC数据源。这可以通过在控制面板的ODBC数据源管理器中创建一个新的数据源来实现。然后,你需要在你的应用程序中设置ODBC连接。这通常涉及到使用ODBC连接字符串来指定连接的参数,比如数据
ODBC连接Oracle实现数据同步的方法
2024-07-15

oracle增量数据同步的方法是什么

Oracle增量数据同步的方法有多种,常见的包括以下几种:使用物化视图(Materialized View):物化视图是基于查询定义的一种逻辑结构,可以将查询结果存储在物理表中。通过使用物化视图,可以在源数据库和目标数据库之间创建一个或多个
oracle增量数据同步的方法是什么
2024-04-09

MySQL数据同步到Doris的四种方式

MySQL数据同步到Doris有四种方式:Binlog同步:高吞吐、低延迟,但需安装Connector,可能影响MySQL性能。CDC同步:更低延迟,支持更多MySQL版本,但需要MySQL支持CDC。SQL联邦查询:无需数据复制,查询性能高,但仅支持读取。离线数据导入:简单易用,适用于大规模数据传输,但延迟较高,需手动操作。选择方式取决于一致性要求、延迟要求、MySQL版本和支持、性能影响和运营成本。
MySQL数据同步到Doris的四种方式
2024-04-02

oracle数据库怎么同步到另一个数据库

要将Oracle数据库同步到另一个数据库,可以使用以下方法:1. 数据库备份和还原:在源数据库中进行全量备份,然后将备份文件复制到目标数据库,并在目标数据库中进行还原。这种方法适用于需要完全覆盖目标数据库的情况。2. 数据库复制:使用Ora
2023-09-17

浅谈MySQL数据同步到Redis缓存的几种方法

本文主要介绍了浅谈MySQL数据同步到Redis缓存的几种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
2023-03-19

浅谈MySQL数据同步到 Redis 缓存的几种方法

目录1 mysql查完数据,再同步写入到Redis中2 Mysql查完数据,通过发送MQ,在消费者线程去同步Redis3 订阅Mysql的Binlog文件(可借助Canal来进行)4 延迟双删5 延迟双写6 总结1 Mysql查完数据,再同
2023-03-19

序列化数据同步到Redis中的方法是什么

序列化数据同步到Redis中的方法有以下几种:使用Redis自带的数据结构:Redis支持的数据结构包括字符串、列表、集合、有序集合、哈希表等,可以直接将数据序列化后存储到这些数据结构中。使用Redis的持久化功能:Redis提供了RDB持
序列化数据同步到Redis中的方法是什么
2024-04-29

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录