我的编程空间,编程开发者的网络收藏夹
学习永远不晚

使用Apache Doris自动同步整个 MySQL/Oracle 数据库进行数据分析

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

使用Apache Doris自动同步整个 MySQL/Oracle 数据库进行数据分析

Flink-Doris-Connector 1.4.0 允许用户一步将包含数千个表的整个数据库(MySQL或Oracle )摄取到Apache Doris(一种实时分析数据库)中。

通过内置的Flink CDC,连接器可以直接将上游源的表模式和数据同步到Apache Doris,这意味着用户不再需要编写DataStream程序或在Doris中预先创建映射表。

当 Flink 作业启动时,Connector 会自动检查源数据库和 Apache Doris 之间的数据等效性。如果数据源包含 Doris 中不存在的表,Connector 会自动在 Doris 中创建相同的表,并利用 Flink 的侧输出来方便一次摄取多个表;如果源中发生架构更改,它将自动获取 DDL 语句并在 Doris 中进行相同的架构更改。
 

一、快速开始

  • 对于MySQL:

下载 JAR 文件:https://github.com/apache/doris-flink-connector/releases/tag/1.4.0


行家:

 org.apache.doris flink-doris-connector-1.15   1.4.0
  • 对于Oracle:

下载 JAR 文件:
Flink 1.15http://justtmp-bj-1308700295.cos.ap-beijing.myqcloud.com/oracle/flink-doris-connector-1.15-1.5.0-SNAPSHOT.jar
Flink 1.16http://justtmp-bj-1308700295.cos.ap-beijing.myqcloud.com/oracle/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar
Flink 1.17http://justtmp-bj-1308700295.cos.ap-beijing.myqcloud.com/oracle/flink-doris-connector-1.17-1.5.0-SNAPSHOT.jar


如何使用它

例如,要将整个 MySQL 数据库引入mysql_dbDoris(MySQL 表名以tbl或test开头),只需执行以下命令(无需提前在Doris 中创建表):

/bin/flink run \   -Dexecution.checkpointing.interval=10s \   -Dparallelism.default=1 \   -c org.apache.doris.flink.tools.cdc.CdcTools \  lib/flink-doris-connector-1.16-1.4.0.jar \  mysql-sync-database \   --database test_db \   --mysql-conf hostname=127.0.0.1 \   --mysql-conf username=root \   --mysql-conf password=123456 \   --mysql-conf database-name=mysql_db \   --including-tables "tbl|test.*" \   --sink-conf fenodes=127.0.0.1:8030 \   --sink-conf username=root \   --sink-conf password=123456 \   --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \   --sink-conf sink.label-prefix=label1 \   --table-conf replication_num=1

摄取Oracle数据库:请参考示例代码(https://github.com/apache/doris-flink-connector/pull/156)。


表现如何

当涉及到同步整个数据库(包含数百甚至数千个活动或不活动的表)时,大多数用户希望在几秒钟内完成。因此我们测试了连接器,看看它是否符合要求:

  • 1000 个 MySQL 表,每个表有 100 个字段。所有表都是活动的(这意味着它们不断更新,每次数据写入涉及一百多行)

  • Flink作业检查点:10s

经过压力测试,系统表现出较高的稳定性,主要指标如下:

根据早期采用者的反馈,该Connector在生产环境中的万表数据库同步中也提供了高性能和系统稳定性。这证明Apache Doris和Flink CDC的结合能够高效可靠地进行大规模数据同步。

二、它如何使数据工程师受益

工程师不再需要担心表创建或表模式维护,从而节省了数天繁琐且容易出错的工作。之前在Flink CDC中,需要为每个表创建一个Flink作业,并在源端建立日志解析链路,但现在通过全库摄取,源数据库的资源消耗大大减少。也是增量更新和全量更新的统一解决方案。

其他特性

连接维度表和事实表

常见的做法是将维度表放在Doris中,通过Flink的实时流进行Join查询。Flink-Doris-Connector 1.4.0基于Flink 的 Async I/O实现了异步 Lookup Join,因此 Flink 实时流不会因为查询而阻塞。此外,连接器还允许您将多个查询合并为一个大查询,并将其立即发送给 Doris 进行处理。这提高了此类连接查询的效率和吞吐量。

节俭 SDK

我们在 Connector 中引入了 Thrift-Service SDK,用户不再需要使用 Thrift 插件或在编译时配置 Thrift 环境。这使得编译过程变得更加简单。

按需流加载

数据同步过程中,当没有新的数据摄入时,不会发出Stream Load请求。这样可以避免不必要的集群资源消耗。

4、后端节点轮询

对于数据摄取,Doris 调用前端节点获取后端节点列表,并随机选择一个发起摄取请求。该后端节点将是协调器。Flink-Doris-Connector 1.4.0 允许用户启用轮询机制,即在每个Flink 检查点都有不同的后端节点作为 Coordinator,以避免单个后端节点长期承受过大的压力。

支持更多数据类型

除了常见的数据类型外,Flink-Doris-Connector 1.4.0 还支持 Doris 中的 DecimalV3/DateV2/DateTimev2/Array/JSON。


三、用法示例

可以通过DataStream或FlinkSQL(有界流)从Doris读取数据。支持谓词下推。

CREATE TABLE flink_doris_source (   name STRING,   age INT,   score DECIMAL(5,2)  )    WITH (     'connector' = 'doris',     'fenodes' = '127.0.0.1:8030',     'table.identifier' = 'database.table',     'username' = 'root',     'password' = 'password',     'doris.filter.query' = 'age=18');​SELECT * FROM flink_doris_source;


连接维度表和事实表:

CREATE TABLE fact_table ( `id` BIGINT, `name` STRING, `city` STRING, `process_time` as proctime()) WITH ( 'connector' = 'kafka',...);​create table dim_city( `city` STRING, `level` INT , `province` STRING, `country` STRING) WITH ( 'connector' = 'doris', 'fenodes' = '127.0.0.1:8030', 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030', 'lookup.jdbc.async' = 'true', 'table.identifier' = 'dim.dim_city', 'username' = 'root', 'password' = '');​SELECT a.id, a.name, a.city, c.province, c.country,c.level FROM fact_table aLEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS cON a.city = c.city


写入Apache Doris:

CREATE TABLE doris_sink (   name STRING,   age INT,   score DECIMAL(5,2)  )    WITH (     'connector' = 'doris',     'fenodes' = '127.0.0.1:8030',     'table.identifier' = 'database.table',     'username' = 'root',     'password' = '',     'sink.label-prefix' = 'doris_label',     //json write in     'sink.properties.format' = 'json',     'sink.properties.read_json_by_line' = 'true');

来源地址:https://blog.csdn.net/dashujuedu/article/details/132733323

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

使用Apache Doris自动同步整个 MySQL/Oracle 数据库进行数据分析

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

使用Apache SeaTunnel进行数据库同步(MySQL to MySQL)

Apache SeaTunnel 起到的主要作用是什么? 目前,大数据体系里有各种各样的数据引擎,有大数据生态的 Hadoop、Hive、Kudu、Kafka、HDFS,也有泛大数据库体系的 MongoDB、Redis、ClickHouse
2023-08-17

如何使用MySQL进行跨数据库的数据同步?

如何使用MySQL进行跨数据库的数据同步?在现代的软件开发中,数据库的使用无处不在。而随着软件项目的增长,数据的同步和备份变得越来越重要。MySQL是一个强大的关系型数据库管理系统,同时也提供了一些可靠的方法来实现跨数据库的数据同步。本文将
2023-10-22

Centos7中MySQL数据库使用mysqldump进行每日自动备份

数据库的备份,对于生产环境来说尤为重要,数据库的备份分为物理备份和逻辑备份。我们将使用mysqldump命令进行数据备份。使用自动任务进行每日备份。一、需求说明:数据库的备份,对于生产环境来说尤为重要,数据库的备份分为物理备份和逻辑备份。物理备份:使用相关的复
Centos7中MySQL数据库使用mysqldump进行每日自动备份
2014-07-08

使用shell脚本每天对MySQL多个数据库自动备份的示例分析

这篇文章主要介绍了使用shell脚本每天对MySQL多个数据库自动备份的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。Linux下使用shell脚本,结合cronta
2023-06-09

Centos7中MySQL数据库怎么使用mysqldump进行每日自动备份的编写

这篇文章主要介绍“Centos7中MySQL数据库怎么使用mysqldump进行每日自动备份的编写”,在日常操作中,相信很多人在Centos7中MySQL数据库怎么使用mysqldump进行每日自动备份的编写问题上存在疑惑,小编查阅了各式资
2023-06-20

编程热搜

目录