我的编程空间,编程开发者的网络收藏夹
学习永远不晚

MySQL 同步 ES 实战,肝到爆!

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

MySQL 同步 ES 实战,肝到爆!

技术是什么?就是拿来玩的,边玩边学,才能成长得更快。

之前已经给大家讲解了 MySQL 同步 ES 的几种方案,下面就教大家如何通过 Canal,将 MySQL 同步到 ES,文章内容绝对妥妥干货!

本文会先讲解需要用到的基础知识,然后再是软件安装,最后就是实战部分。

不 BB,上文章目录:

01 基础知识

1.1 主从复制原理

MySQL 的主从复制是依赖于 binlog,也就是记录 MySQL 上的所有变化并以二进制形式保存在磁盘上二进制日志文件。

主从复制就是将 binlog 中的数据从主库传输到从库上,一般这个过程是异步的,即主库上的操作不会等待 binlog 同步地完成。

详细流程如下:

  1. 主库写 binlog:主库的更新 SQL(update、insert、delete) 被写到 binlog;
  2. 主库发送 binlog:主库创建一个 log dump 线程来发送 binlog 给从库;
  3. 从库写 relay log:从库在连接到主节点时会创建一个 IO 线程,以请求主库更新的 binlog,并且把接收到的 binlog 信息写入一个叫做 relay log 的日志文件;
  4. 从库回放:从库还会创建一个 SQL 线程读取 relay log 中的内容,并且在从库中做回放,最终实现主从的一致性。

1.2 Cannel 基础

Canel 是一款常用的数据同步工具,其原理是基于 Binlog 订阅的方式实现,模拟一个 MySQL Slave 订阅 Binlog 日志,从而实现 CDC(Change Data Capture),将已提交的更改发送到下游。

主要流程如下:

  1. Canal 服务端向 MySQL 的 master 节点传输 dump 协议;
  2. MySQL 的 master 节点接收到 dump 请求后推送 Binlog 日志给 Canal 服务端,解析 Binlog 对象(原始为 byte 流)转成 Json 格式;
  3. Canal 客户端通过 TCP 协议或 MQ 形式监听 Canal 服务端,同步数据到 ES。

下面是 Cannel 执行的核心流程,其中 Binlog Parser 主要负责 Binlog 的提取、解析和推送,EventSink 负责数据的过滤 、路由和加工,仅作了解即可。

02 软件下载安装

我的电脑是 Macos-x64,所以后面的软件安装,都是基于这个。

2.1 Java JDK

  • 官网:https://www.oracle.com/java/technologies/downloads/
  • JDK 版本:11.0.19

由于 Canal 和 ES 的安装,都强依赖 JDK,所以这里有必要先说明。

前方高能,这里有坑!!!

如果你选的版本不对,ES 安装可能会失败,然后 Canal 同步数据到 ES 时,也会出现很多诡异的问题。

2.2 MySQL

MySQL 大家应该都安装了,这里需要打开 MySQL 的 BinLog。

我是 Mac,主要新建一个 my.cnf 文件,然后再重启 MySQL。

这里重启 MySQL,我搞了半天,BinLog 开启后,会看到 BinLog 日志。

然后需要创建一个账号,账号和密码都是 Cannal,给后面 Canal 使用。

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost' IDENTIFIED BY 'canal' ;

2.3 Canal

  • 官网:https://github.com/alibaba/canal/releases
  • 版本:v1.1.6

下载 canal.adapter 和 canal.deployer 两个就可以:

  • canal.deployer:相当于 canal 的服务端,启动它才可以在客户端接收数据库变更信息。
  • canal.adapter:增加客户端数据落地的适配及启动功能(当 deployer 接收到消息后,会根据不同的目标源做适配,比如是 es 目标源适配和 hbase 适配等等)。

备注:canal.admin 为 canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的 WebUI 操作界面,方便更多用户快速和安全的操作,我这边使用的是单机的,因此就没有下载安装,大家也可以拉 source code 源码去研究下。

2.4 ES

  • ES 官网:https://www.elastic.co/cn/downloads/elasticsearch
  • ES 版本:7.17.4

Mac 安装 ES 非常简单:

brew install elasticsearch

安装细节不赘述,安装成功后,输入以下网址:

http://localhost:9200/?pretty

2.5 Kibana

  • 下载网址:https://www.elastic.co/cn/downloads/past-releases#kibana
  • 版本:7.14.0

它是 ES 的界面化操作工具,安装细节不赘述,安装成功后,输入以下网址:

http://localhost:5601/app/dev_tools#/console

2.6 IK 分词器

  • 下载网址:https://github.com/medcl/elasticsearch-analysis-ik/releases/tag/v7.17.2
  • 版本:v7.17.2

它是 ES 的分词器,安装细节不赘述,安装成功后,可以验证一下分词效果。

2.7 小节

MySQL 开启 BinLog,这个不难,主要观察是否有 BinLog 日志。

Canal 的安装是最复杂的,涉及到很多配置修改,后面会讲解。

最后是 ES + Kibana + IK 分词器,这个其实也不难,主要关注 ES 绑定的 JDK 版本,三款软件的安装可以参考这篇:https://blog.csdn.net/weixin_46049028/article/details/129956485

03 Canal 配置

3.1 canal.deployer 配置

修改 conf—>example 文件夹的 instance.properties 监听的数据库配置。

这里主要修改的监听 MySQL 的 URL、用户名和密码。

这里默认的账号密码就是 canal,前面已经教大家如何创建了。

3.2 canal.deployer 启动

在 canal.deployer 中的 bin 文件下去启动命令 startup.sh

这样就代表已经启动了,我们可以去看下启动日志。

上面 start successful 代表已经启动成功,并且已经监听我的 MySQL 数据库。

3.3 canal.adapte 配置

Step1: 先把 adapter 下面的 bootstrap.yml,全部注释掉,否则会提示你 XX 表不存在,这里坑了我好惨。

Step2: 再修改 adapter 的 application.yml 配置文件。

这里的坑,一般就是 mysql 的账号密码不对,或者给的 es 链接,没有"http://"前缀,这些都是我过踩的坑。

Step3: 修改我们在 application.yml 中配置的目标数据源 es7 文件夹内容。

由于我们这里是对表 article 进行去监听,因此我们在  es7 文件夹中去创建 article.yml 文件。

由于我们需要把技术派项目中的文章查询功能,改造成 ES 的查询方式,所以我们就把技术派的文章表 article,同步到 ES 中。

yml文件配置如下:

dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的class="lazy" data-srcDataSources中的值
destination: example  # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
  _index: article # es 的索引名称
  _id: _id  # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  sql: "SELECT t.id AS _id,t.id,t.user_id,t.article_type,t.title,t.short_title,t.picture,
        t.summary,t.category_id,t.source,t.source_url,t.offical_stat,t.topping_stat,
        t.cream_stat,t.`status`,t.deleted,t.create_time,t.update_time
        FROM article t"        # sql映射
  commitBatch: 1   # 提交批大小

Step4: 在 Kibana 中创建 ES 的 article 索引

代码如下:

PUT /article
{
    "mappings" : {
      "properties" : {
        "id" : {
          "type" : "integer"
        },
        "user_id" : {
          "type" : "integer"
        },
        "article_type" : {
          "type" : "integer"
        },
        "title" : {
          "type" : "text",
          "analyzer": "ik_max_word"
        },
        "short_title" : {
          "type" : "text",
          "analyzer": "ik_max_word"
        },
        "picture" : {
          "type" : "text",
          "analyzer": "ik_max_word"
        },
        "summary" : {
          "type" : "text",
          "analyzer": "ik_max_word"
        },
        "category_id" : {
          "type" : "integer"
        },
        "source" : {
          "type" : "integer"
        },
        "source_url" : {
          "type" : "text",
          "analyzer": "ik_max_word"
        },
        "offical_stat" : {
          "type" : "integer"
        },
        "topping_stat" : {
          "type" : "integer"
        },
        "cream_stat" : {
          "type" : "integer"
        },
        "status" : {
          "type" : "integer"
        },
        "deleted" : {
          "type" : "integer"
        },
        "create_time" : {
          "type" : "date"
        },
        "update_time" : {
          "type" : "date"
        }
      }
    }
 }

3.4 canal.adapte 启动

我们看下启动日志:

上面没有任何报错,并且已经启动了 8081 端口,说明已经启动成功,此时我们就可以操作了。

04 数据同步实战

4.1 全量同步

在开始 adapter 之后,我们应该先来一把全量数据同步,在源码中提供了一个接口进行全量同步,命令如下:

curl http://127.0.0.1:8081/etl/es7/article.yml -X POST

上面就是执行同步成功后,提示已经导入 10 条。

4.2 增量同步

增量数据就是当我在 MySQL 中 update、delete 和 insert 时,那么 ES 中数据也会对应发生变化,我下面演示下修改:

日志打印如下:

ES查询结果如下:

上面结果中说明 ES 已经更改成功。

05 总结

我们再回顾一下整体执行流程:

写到这里,就结束了,是不是满满的干货呢?基本是手把手教你如何将 MySQL 同步到 ES,不仅是增量同步,还包括全量同步,如果你的项目也需要用到该场景,基本可以直接照搬。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

MySQL 同步 ES 实战,肝到爆!

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

MySQL 同步 ES 实战,肝到爆!

基本是手把手教你如何将 MySQL 同步到 ES,不仅是增量同步,还包括全量同步,如果你的项目也需要用到该场景,基本可以直接照搬。
MySQLES场景2024-11-30

详解 canal 同步 MySQL 增量数据到 ES

canal 是一个非常有趣的开源项目,很多公司使用 canal 构建数据传输服务( Data Transmission Service ,简称 DTS ) 。推荐大家阅读这个开源项目,你可以从中学习到网络编程、多线程模型、高性能队列 Dis

Docker 搭建 Elasticsearch、Kibana、Logstash 同步 MySQL 数据到 ES

Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。作为 Elastic Stack 的核心,Elasticsearch 会集中存储您的数据,让您飞快完成搜索,微调相关性,进行强
DockerMySQL2024-12-13

Docker搭建Elasticsearch、Kibana、Logstash 同步MySQL数据到ES

现在最新版就是8.5,最新的教程少和问题未知,小编选择7版本的,求一手稳定哈!
dockermysql2024-12-13

【ElasticSearch】ES与MySQL数据同步方案及Java实现

文章目录 一、同步实现思路1、方案一:同步调用2、方案二:异步通知3、方案三:监听binlog 二、实现ES与MySQL数据同步1、导入hotel-admin工程2、项目分析3、SpringAMQP整合4、声明队列和交换机5、发
2023-08-30

MySQL数据同步ES的四种方法!你能想到几种?

我们也很容易想到异步双写的办法,上架商品的时候,先把商品数据丢进MQ,为了解耦合,我们一般会拆分一个搜索服务,由搜索服务去订阅商品变动的消息,来完成同步。
ESMySQL数据2024-12-13

DTS数据传输同步MySQL至Elasticsearch实战

本章节通过RDS MySQL中的生产数据实时同步到阿里云Elasticsearch中进行搜索查询,通过数据传输服务DTS(Data Transmission Service)进行数据同步操作。
MySQLDTS数据2024-11-29

mysql实时同步到kafka中怎么实现

要实现MySQL实时同步到Kafka中,可以通过以下步骤来实现:使用Debezium连接MySQL数据库:Debezium是一个开源的CDC(Change Data Capture)工具,可以监控MySQL数据库的变化并将变化数据发送到Ka
mysql实时同步到kafka中怎么实现
2024-04-08

基于Flink CDC实时同步数据(MySQL到MySQL)

一、环境 jdk8Flink 1.16.1(部署在远程服务器:192.168.137.99)Flink CDC 2.3.0MySQL 8.0(安装在本地:192.168.3.31) (安装部署过程略) 二、准备 准备三个数据库:flink_
2023-08-16

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录