我的编程空间,编程开发者的网络收藏夹
学习永远不晚

使用Flink CDC将Mysql中的数据实时同步到ES

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

使用Flink CDC将Mysql中的数据实时同步到ES

前言

最近公司要搞搜索,需要把mysql中的数据同步到es中来进行搜索,由于公司已经搭建了flink集群,就打算用flink来做这个同步。本来以为很简单,跟着官网文档走就好了,结果没想到折腾了将近一周的时间……

我也是没想到,这玩意网上资源竟然这么少,找到的全部都是通过flink sql-client实现的,但这有个问题,当fink集群重启,JOB就没有了,没有办法通过savePointing来恢复。所以还是记录下。

代码

直接上代码:

public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.enableCheckpointing(3000);    env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink/savepointings");        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);        tableEnv.executeSql("CREATE TABLE orders (\n" +                "   order_id INT,\n" +                "   order_date TIMESTAMP(0),\n" +                "   customer_name STRING,\n" +                "   price DECIMAL(10, 5),\n" +                "   product_id INT,\n" +                "   order_status BOOLEAN,\n" +                "   PRIMARY KEY (order_id) NOT ENFORCED\n" +                " ) WITH (\n" +                "   'connector' = 'mysql-cdc',\n" +                "   'hostname' = 'localhost',\n" +                "   'port' = '3306',\n" +                "   'username' = 'root',\n" +                "   'password' = '123456',\n" +                "   'database-name' = 'mydb',\n" +                "   'table-name' = 'orders'\n" +                " );").await();        tableEnv.executeSql("CREATE TABLE products (\n" +                "    id INT,\n" +                "    name STRING,\n" +                "    description STRING,\n" +                "    PRIMARY KEY (id) NOT ENFORCED\n" +                "  ) WITH (\n" +                "    'connector' = 'mysql-cdc',\n" +                "    'hostname' = 'localhost',\n" +                "    'port' = '3306',\n" +                "    'username' = 'root',\n" +                "    'password' = '123456',\n" +                "    'database-name' = 'mydb',\n" +                "    'table-name' = 'products'\n" +                "  );").await();        tableEnv.executeSql("CREATE TABLE enriched_orders (\n" +                "   order_id INT,\n" +                "   order_date TIMESTAMP(0),\n" +                "   customer_name STRING,\n" +                "   price DECIMAL(10, 5),\n" +                "   product_id INT,\n" +                "   order_status BOOLEAN,\n" +                "   product_name STRING,\n" +                "   product_description STRING,\n" +                "   PRIMARY KEY (order_id) NOT ENFORCED\n" +                " ) WITH (\n" +                "     'connector' = 'elasticsearch-7',\n" +                "     'hosts' = 'http://localhost:9200',\n" +                "     'index' = 'enriched_orders_lhc'\n" +                " );");        tableEnv.executeSql("INSERT INTO enriched_orders\n" +                " SELECT o.*, p.name, p.description\n" +                " FROM orders AS o\n" +                " LEFT JOIN products AS p ON o.product_id = p.id");        env.execute("Mysql to ES");    }

来源地址:https://blog.csdn.net/lhcnicholas/article/details/129854091

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

使用Flink CDC将Mysql中的数据实时同步到ES

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

基于Flink CDC实时同步数据(MySQL到MySQL)

一、环境 jdk8Flink 1.16.1(部署在远程服务器:192.168.137.99)Flink CDC 2.3.0MySQL 8.0(安装在本地:192.168.3.31) (安装部署过程略) 二、准备 准备三个数据库:flink_
2023-08-16

如何使用Flink CDC实现 Oracle数据库数据同步

目录前言一、开启归档日志二、创建flinkcdc专属用户2.1 对于oracle 非CDB数据库,执行如下sql2.2 对于Oracle CDB数据库,执行如下sql三、指定oracle表、库级启用四、使用flink-connector-o
如何使用Flink CDC实现 Oracle数据库数据同步
2024-08-21

DataX安装使用实现MySQL到MySQL数据同步

DataX安装使用实现MySQL到MySQL数据同步1.前置条件:1.1jdk安装jdk安装前往官网,这里我安装jdk-8u261解压sudo mkdir -p /opt/moudlesudo tar -zxvf jdk-8u261-linux-x64.tar
DataX安装使用实现MySQL到MySQL数据同步
2018-05-28

MySQL数据实时同步到MongoDB的实践分享

目录mysql 到 MongoDB 实时数据同步实操分享第一步:配置MySQL 连python接第android二步:配置 MongoDB 连接第三步:选择同步模式-全量/增量/全+增第四步:进行数据校验MySQL 到 MongoDB 实时
MySQL数据实时同步到MongoDB的实践分享
2024-01-29

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表 package flink;import org.apache.flink.streaming.api.environment.StreamExecutionEn
2023-08-30

使用Canal实现PHP应用程序与MySQL数据库的实时数据同步

Canal是阿里巴巴开源的一个数据同步工具,可实现MySQL数据库到其他数据源的实时同步,PHP应用程序中可轻松使用,提高系统的可靠性和实时性,提供了丰富的API和文档支持
2023-05-16

利用python怎么将MySQL指定的表增量同步数据到clickhouse脚本

本篇文章为大家展示了利用python怎么将MySQL指定的表增量同步数据到clickhouse脚本,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。python实现MySQL指定表增量同步数据到clic
2023-06-06

使用阿里云实现数据同步到服务器的详细步骤

随着互联网的快速发展,数据的安全性和实时性变得越来越重要。阿里云数据同步到服务器是一种非常实用的方法,可以帮助我们实现数据的安全备份,以及在服务器之间快速、准确地传输数据。本文将详细介绍如何使用阿里云实现数据同步到服务器的步骤。一、准备工作首先,你需要在阿里云上注册一个账号,并购买相应的云服务,如阿里云的数据盘。
使用阿里云实现数据同步到服务器的详细步骤
2023-12-16

编程热搜

目录