我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Canal——数据同步

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Canal——数据同步

1.数据同步到数据库:

  • 在介绍方案2之前我们先来介绍一下MySQL复制的原理,如下图所示:
    • 主服务器操作数据,并将数据写入Bin log
    • 从服务器调用I/O线程读取主服务器的Bin log,并且写入到自己的Relay log中,再调用SQL线程从Relay log中解析数据,从而同步到自己的数据库中

  • 方案2就是:
    • 上面MySQL的整个复制流程可以总结为一句话,那就是:从服务器读取主服务器Bin log中的数据,从而同步到自己的数据库中
    • 我们方案2也是如此,就是在概念上把主服务器改为MySQL,把从服务器改为Redis而已(如下图所示),当MySQL中有数据写入时,我们就解析MySQL的Bin log,然后将解析出来的数据写入到Redis中,从而达到同步的效果

  • 例如下面是一个云数据库实例分析:
    • 云数据库与本地数据库是主从关系。云数据库作为主数据库主要提供写,本地数据库作为从数据库从主数据库中读取数据
    • 本地数据库读取到数据之后,解析Bin log,然后将数据写入写入同步到Redis中,然后客户端从Redis读数据

  • 这个技术方案的难点就在于: 如何解析MySQL的Bin Log。但是这需要对binlog文件以及MySQL有非常深入的理解,同时由于binlog存在Statement/Row/Mixedlevel多种形式,分析binlog实现同步的工作量是非常大的

Canal开源技术

  • canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)
  • 开源参考地址有:https://github.com/liukelin/canal_mysql_nosql_sync
  • 工作原理(模仿MySQL复制):
    • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
    • mysql master收到dump请求,开始推送binary log给slave(也就是canal)
    • canal解析binary log对象(原始为byte流)

  • 架构:
    • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
    • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
    • eventStore (数据存储)
    • metaManager (增量订阅&消费信息管理器)
    • server代表一个canal运行实例,对应于一个jvm
    • instance对应于一个数据队列 (1个server对应1..n个instance)
    • instance模块:

  • 大致的解析过程如下:
    • parse解析MySQL的Bin log,然后将数据放入到sink中
    • sink对数据进行过滤,加工,分发
    • store从sink中读取解析好的数据存储起来
    • 然后自己用设计代码将store中的数据同步写入Redis中就可以了
    • 其中parse/sink是框架封装好的,我们做的是store的数据读取那一步

  • 更多关于Cancl可以百度搜索
  • 下面是运行拓扑图

  • MySQL表的同步,采用责任链模式,每张表对应一个Filter 。例如zvsync中要用到的类设计如下:

  • 下面是具体化的zvsync中要用到的类 ,每当新增或者删除表时,直接进行增删就可以了

Canal的架构设计

Canal 伪装成 MySQL 从节点,mySQL master 会推送 binary log 给canal,canal读取 MySQL binlog的变更信息并生成消息,客户端订阅这些数据变更消息,处理并存储。只要开发一个 Canal客户端就可以解析出MySQL的操作,再将这些数据发送到大数据流计算处理引擎,即可以实现对 MySQL 实时处理。

我们一般使用canal时,只需要引入一个客户端,比如java类似这样:

com.alibaba.otter

canal.client

1.1.0

然后就可以订阅binlog消息了。

另外canal也支持直接把binlog消息发送到mq,这样对多语言的支持更好一些。

canal自身帮我们做了很多事,这样我们自己写的客户端才能更简单,更专注于业务。下面就来看看canal内部的架构。

说明:

server代表一个canal运行实例,对应于一个jvm

instance对应于一个数据队列

核心是instance模块,它包含:

  1. eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)

parser模块

parser模块用来订阅binlog事件,然后通过sink投递到store。parser模块底层依赖dbsync、driver模块。

eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)

  1. eventStore (数据存储)

  1. metaManager (增量订阅&消费信息管理器)

meta模块

核心接口为CanalMetaManager,实现了订阅&消费的机制,主要用于记录canal消费到的mysql binlog的位置。CanalMetaManager接口有几个实现类:

FileMixedMetaManager

MemoryMetaManager

MixedMetaManager

PeriodMixedMetaManager

ZooKeeperMetaManager

这些实现类之间有些会持有其它实现的引用来装饰自己的功能(装饰器模式),

CanalServer的两种实现方式

这两个实现代表了canal的两种应用模式,CanalServerWithNetty在canal独立部署场景发挥作用,开发者只需要实现cient,不同的应用通过canal client与canal server进行通信,canal client的请求统一由CanalServerWithNetty接受进行处理。

而通过CanalServerWithEmbeded,可以不需要独立部署canal,而是把canal嵌入到我们自己的服务里。但是这种对开发者的要求就比较高。

下面的图表示二者的关系,

CanalServerWithEmbedded 内部管理所有的CanalInstance,通过 Client 的信息(destination),找到 Client 订阅的 CanalInstance,然后调用 CanalInstance 内部的各个模块进行处理(4个模块 )

CanalInstance模块解析

1.CanalServerWithEmbedded 内部管理所有的CanalInstance,通过 Client 的信息(destination),找到 Client 订阅的 CanalInstance,然后调用 CanalInstance 内部的各个模块进行处理

  1. instance代表了一个实际运行的数据队列,包括了EventPaser,EventSink,EventStore等组件

  1. 这幅图我们可以看出instance是怎么生成的。

CanalInstanceGenerator相当于一个工厂类,通过 destination 产生特定的 CanalInstance,它有两个实现:

ManagerCanalInstanceGenerator类,manager方式: 和你自己的内部web console/manager系统进行对接。(目前主要是公司内部使用)

SpringCanalInstanceGenerator类,spring方式:基于spring xml + properties进行定义,构建spring配置

  1. 具体使用哪个,是通过配置的 canal.properties文件
canal.instance.global.mode = spring

  1. 先来看下spring的版本实现,

  1. 源码里面给了几个默认的模板选择实现

  1. 然后部署的时候,我们可以通过在canal.properties配置文件中指定使用哪个文件:
canal.instance.global.spring.xml = classpath:spring/file-instance.xml  
  1. 几种实现不不同:
  • spring/memory-instance.xml
  • spring/file-instance.xml
  • spring/default-instance.xml
  • spring/group-instance.xml

这几个文件的主要区别是,metaManager 和eventParser 这两个配置有所不同,可能在内存、文件或zk进行存储。

  1. generate方法返回的是CanalInstanceWithSpring这个实现类,它继承自AbstractCanalInstance,并且实现了CanalInstance。这个类的实现只有几十行,之所以这么少是因为大部分的逻辑都已经通过spring的配置文件实现了,如下

  2. 具体类之间关系

  1. start方法和stop方法没什么可讲的,就是启停instance内部的组件。
  2. beforeStartEventParser和afterStartEventParser是eventParser启动的前置和后置操作

前者调用了startEventParserInternal,后者调用了stopEventParserInternal,

就是分别负责了CanalLogPositionManager和CanalHAController的启动停止工作。

  1. CanalLogPositionManager记录binlog最后一次解析成功位置,有不同的实现,可以保存在内存,zk等存在介质里。mysql在主从同步过程中,slave自己需要维护binlog的消费进度信息。而canal伪装成slave,因此也要维护这样的信息。
  2. CanalHAController主要是通过失败检测, 控制 EventParser 的链接主机管理,判断当前该链接哪个mysql数据库。它只有一个实现类HeartBeatHAController

  1. 失败转换的逻辑也很简单,定时发送心跳语句到当前链接的数据库,超过一定次数检测失败时,尝试切换到备机

  1. 心跳的逻辑在 com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.MysqlDetectingTimeTask 实现,是个定时器。

总结

总体来看,CanalInstance模块本身没有什么特别复杂的逻辑,它的核心处理都在parser、sink、store、metamanager等内部组件里

CANAL官方文档

基本说明

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:

环境版本

  • 操作系统:CentOS release 6.6 (Final)
  • java版本: jdk1.8
  • canal 版本: 请下载最新的安装包,本文以当前v1.1.1 的canal.deployer-1.1.1.tar.gz为例
  • MySQL版本 :5.7.18
  • 注意 : 关闭所有机器的防火墙,同时注意启动可以相互telnet ip 端口

一、 安装zookeeper

参考:Zookeeper QuickStart

二、安装MQ

三、 安装canal.server

3.1 下载压缩包

到官网地址(release)下载最新压缩包,请下载 canal.deployer-latest.tar.gz

3.2 将canal.deployer 复制到固定目录并解压

mkdir -p /usr/local/canal cp canal.deployer-1.1.6.tar.gz /usr/local/canal tar -zxvf canal.deployer-1.1.6.tar.gz

3.3 配置修改参数

a. 修改instance 配置文件 vi conf/example/instance.properties

# 按需修改成自己的数据库信息 ################################################# ... canal.instance.master.address=192.168.1.20:3306 # username/password,数据库的用户名和密码 ... canal.instance.dbUsername = canal canal.instance.dbPassword = canal ... # mq config canal.mq.topic=example # 针对库名或者表名发送动态topic #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #库名.表名: 唯一主键,多个表之间用逗号分隔 #canal.mq.partitionHash=mytest.person:id,mytest.role:id #################################################

对应ip 地址的MySQL 数据库需进行相关初始化与设置, 可参考 Canal QuickStart

b. 修改canal 配置文件vi /usr/local/canal/conf/canal.properties

# ... # 可选项: tcp(默认), kafka,RocketMQ,rabbitmq,pulsarmq canal.serverMode = kafka # ... # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下) canal.mq.canalBatchSize = 50 # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时 canal.mq.canalGetTimeout = 100 # 是否为flat json格式对象 canal.mq.flatMessage = false

mq相关参数说明 (<=1.1.4版本)

参数名

参数说明

默认值

canal.mq.servers

kafka为bootstrap.servers
rocketMQ中为nameserver列表

127.0.0.1:6667

canal.mq.retries

发送失败重试次数

0

canal.mq.batchSize

kafka为ProducerConfig.BATCH_SIZE_CONFIG
rocketMQ无意义

16384

canal.mq.maxRequestSize

kafka为ProducerConfig.MAX_REQUEST_SIZE_CONFIG
rocketMQ无意义

1048576

canal.mq.lingerMs

kafka为ProducerConfig.LINGER_MS_CONFIG , 如果是flatMessage格式建议将该值调大, 如: 200
rocketMQ无意义

1

canal.mq.bufferMemory

kafka为ProducerConfig.BUFFER_MEMORY_CONFIG
rocketMQ无意义

33554432

canal.mq.acks

kafka为ProducerConfig.ACKS_CONFIG
rocketMQ无意义

all

canal.mq.kafka.kerberos.enable

kafka为ProducerConfig.ACKS_CONFIG
rocketMQ无意义

false

canal.mq.kafka.kerberos.krb5FilePath

kafka kerberos认证
rocketMQ无意义

../conf/kerberos/krb5.conf

canal.mq.kafka.kerberos.jaasFilePath

kafka kerberos认证
rocketMQ无意义

../conf/kerberos/jaas.conf

canal.mq.producerGroup

kafka无意义
rocketMQ为ProducerGroup名

Canal-Producer

canal.mq.accessChannel

kafka无意义
rocketMQ为channel模式,如果为aliyun则配置为cloud

local

---

---

---

canal.mq.vhost=

rabbitMQ配置

canal.mq.exchange=

rabbitMQ配置

canal.mq.username=

rabbitMQ配置

canal.mq.password=

rabbitMQ配置

canal.mq.aliyunuid=

rabbitMQ配置

---

---

---

canal.mq.canalBatchSize

获取canal数据的批次大小

50

canal.mq.canalGetTimeout

获取canal数据的超时时间

100

canal.mq.parallelThreadSize

mq数据转换并行处理的并发度

8

canal.mq.flatMessage

是否为json格式
如果设置为false,对应MQ收到的消息为protobuf格式
需要通过CanalMessageDeserializer进行解码

false

---

---

---

canal.mq.topic

mq里的topic名

canal.mq.dynamicTopic

mq里的动态topic规则, 1.1.3版本支持

canal.mq.partition

单队列模式的分区下标,

1

canal.mq.partitionsNum

散列模式的分区数

canal.mq.partitionHash

散列规则定义
库名.表名 : 唯一主键,比如mytest.person: id
1.1.3版本支持新语法,见下文

mq相关参数说明 (>=1.1.5版本)

在1.1.5版本开始,引入了MQ Connector设计,因此参数配置做了部分调整

参数名

参数说明

默认值

canal.aliyun.accessKey

阿里云ak

canal.aliyun.secretKey

阿里云sk

canal.aliyun.uid

阿里云uid

canal.mq.flatMessage

是否为json格式
如果设置为false,对应MQ收到的消息为protobuf格式
需要通过CanalMessageDeserializer进行解码

false

canal.mq.canalBatchSize

获取canal数据的批次大小

50

canal.mq.canalGetTimeout

获取canal数据的超时时间

100

canal.mq.accessChannel = local

是否为阿里云模式,可选值local/cloud

local

canal.mq.database.hash

是否开启database混淆hash,确保不同库的数据可以均匀分散,如果关闭可以确保只按照业务字段做MQ分区计算

true

canal.mq.send.thread.size

MQ消息发送并行度

30

canal.mq.build.thread.size

MQ消息构建并行度

8

------

-----------

-------

kafka.bootstrap.servers

kafka服务端地址

127.0.0.1:9092

kafka.acks

kafka为ProducerConfig.ACKS_CONFIG

all

kafka.compression.type

压缩类型

none

kafka.batch.size

kafka为ProducerConfig.BATCH_SIZE_CONFIG

16384

kafka.linger.ms

kafka为ProducerConfig.LINGER_MS_CONFIG , 如果是flatMessage格式建议将该值调大, 如: 200

1

kafka.max.request.size

kafka为ProducerConfig.MAX_REQUEST_SIZE_CONFIG

1048576

kafka.buffer.memory

kafka为ProducerConfig.BUFFER_MEMORY_CONFIG

33554432

kafka.max.in.flight.requests.per.connection

kafka为ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION

1

kafka.retries

发送失败重试次数

0

kafka.kerberos.enable

kerberos认证

false

kafka.kerberos.krb5.file

kerberos认证

../conf/kerberos/krb5.conf

kafka.kerberos.jaas.file

kerberos认证

../conf/kerberos/jaas.conf

------

-----------

-------

rocketmq.producer.group

rocketMQ为ProducerGroup名

test

rocketmq.enable.message.trace

是否开启message trace

false

rocketmq.customized.trace.topic

message trace的topic

rocketmq.namespace

rocketmq的namespace

rocketmq.namesrv.addr

rocketmq的namesrv地址

127.0.0.1:9876

rocketmq.retry.times.when.send.failed

重试次数

0

rocketmq.vip.channel.enabled

rocketmq是否开启vip channel

false

rocketmq.tag

rocketmq的tag配置

空值

---

---

---

rabbitmq.host

rabbitMQ配置

rabbitmq.virtual.host

rabbitMQ配置

rabbitmq.exchange

rabbitMQ配置

rabbitmq.username

rabbitMQ配置

rabbitmq.password

rabbitMQ配置

rabbitmq.deliveryMode

rabbitMQ配置

---

---

---

pulsarmq.serverUrl

pulsarmq配置

pulsarmq.roleToken

pulsarmq配置

pulsarmq.topicTenantPrefix

pulsarmq配置

---

---

---

canal.mq.topic

mq里的topic名

canal.mq.dynamicTopic

mq里的动态topic规则, 1.1.3版本支持

canal.mq.partition

单队列模式的分区下标,

1

canal.mq.enableDynamicQueuePartition

动态获取MQ服务端的分区数,如果设置为true之后会自动根据topic获取分区数替换canal.mq.partitionsNum的定义,目前主要适用于RocketMQ

false

canal.mq.partitionsNum

散列模式的分区数

canal.mq.dynamicTopicPartitionNum

mq里的动态队列分区数,比如针对不同topic配置不同partitionsNum

canal.mq.partitionHash

散列规则定义
库名.表名 : 唯一主键,比如mytest.person: id
1.1.3版本支持新语法,见下文

canal.mq.dynamicTopic 表达式说明

canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔

  • 例子1:test\\.test 指定匹配的单表,发送到以test_test为名字的topic上
  • 例子2:.*\\..* 匹配所有表,则每个表都会发送到各自表名的topic上
  • 例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
  • 例子4:test\\..* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
  • 例子5:test,test1\\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值

为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table

  • 例子1: test:test\\.test 指定匹配的单表,发送到以test为名字的topic上
  • 例子2: test:.*\\..* 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下
  • 例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下
  • 例子4:testA:test\\..* 指定匹配的表达式,针对匹配的表会发送到testA的topic下
  • 例子5:test0:test,test1:test1\\.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1\\.test1的表发送到对应的test1的topic下,其余的表发送到默认的canal.mq.topic值

大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力

canal.mq.partitionHash 表达式说明

canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔

  • 例子1:test\\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
  • 例子2:.*\\..*:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
  • 例子3:.*\\..*:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
  • 例子4: 匹配规则啥都不写,则默认发到0这个partition上
  • 例子5:.*\\..* ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
    • 按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
  • 例子6: test\\.test:id,.\\..* , 针对test的表按照id散列,其余的表按照table散列

注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)

其他详细参数可参考Canal AdminGuide

mq顺序性问题

binlog本身是有序的,写入到mq之后如何保障顺序是很多人会比较关注,在issue里也有非常多人咨询了类似的问题,这里做一个统一的解答

  1. canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择
  2. canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区
  • canal.mq.dynamicTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默认topic name
  • canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做到按表级做分区、pk级做分区等
  1. canal的消费顺序性,主要取决于描述2中的路由选择,举例说明:
  • 单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢,单分区的性能写入大概在2~3k的TPS
  • 多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题
  • 单topic、多topic的多分区,如果用户选择的是指定table的方式,那和第二部分一样,保障的是表级别的顺序性(存在热点表写入分区的性能问题),如果用户选择的是指定pk hash的方式,那只能保障的是一个pk的多次binlog顺序性 ** pk hash的方式需要业务权衡,这里性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产生业务处理错乱的情况. 如果有pk变更,pk变更前和变更后的值会落在不同的分区里,业务消费就会有先后顺序的问题,需要注意

MQ发送性能数据

1.1.5版本可以在5k~50k左右,具体可参考:Canal-MQ-Performance

阿里云RocketMQ对接参数

# 配置ak/sk canal.aliyun.accessKey = XXX canal.aliyun.secretKey = XXX # 配置topic canal.mq.accessChannel = cloud canal.mq.servers = 内网接入点 canal.mq.producerGroup = GID_**group(在后台创建) canal.mq.namespace = rocketmq实例id canal.mq.topic=(在后台创建)

kafka ssl配置参数

# canal.properties配置文件 kafka.kerberos.enable = true kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf" kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

3.4 启动

cd /usr/local/canal/ sh bin/startup.sh

3.5 查看日志

a.查看 logs/canal/canal.log

vi logs/canal/canal.log

b. 查看instance的日志:

vi logs/example/example.log

3.6 关闭

cd /usr/local/canal/ sh bin/stop.sh

3.7 MQ数据消费

canal.client下有对应的MQ数据消费的样例工程,包含数据编解码的功能

Footer

© 2023 GitHub, Inc.

Footer navigation

Canal部分源码

1.通过 destination 产生特定的 CanalInstance

来源地址:https://blog.csdn.net/weixin_43241803/article/details/129562160

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Canal——数据同步

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

SpringBoot整合Canal数据同步的方法

这篇文章主要介绍“SpringBoot整合Canal数据同步的方法”,在日常操作中,相信很多人在SpringBoot整合Canal数据同步的方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”SpringBoo
2023-06-29

详解 canal 同步 MySQL 增量数据到 ES

canal 是一个非常有趣的开源项目,很多公司使用 canal 构建数据传输服务( Data Transmission Service ,简称 DTS ) 。推荐大家阅读这个开源项目,你可以从中学习到网络编程、多线程模型、高性能队列 Dis

SpringBoot使用Canal做MySQL与Redis的数据同步

mysql-bin.000003 是首次配置的,如果master节点重启了,这个文件会递增变为mysql-bin.000004,这时我们的从节点会自动连上这mysql-bin.000004。

MySQL高性能实现Canal数据同步神器

目录简介配置mysqlCentos7 安装 canalJava客户端简介Canal是阿里巴巴基于Java开源的数据同步工具。平时业务场景使用比较多的如下:同步数据到ES、Redis缓存中。数据同步。业务需要监听数据。图片来源阿里巴巴
2022-08-11

DBSyncer/Canal/Kafka-主流数据同步中间件对比

数据库数据同步中间件是用于实现数据库之间数据同步的工具或组件,它可以处理多种数据库类型,包括MySQL、Oracle、SQL Server等。

怎么使用canal+Kafka进行数据库同步操作

这篇文章主要介绍了怎么使用canal+Kafka进行数据库同步操作的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇怎么使用canal+Kafka进行数据库同步操作文章都会有所收获,下面我们一起来看看吧。平时工作中
2023-06-27

在SpringBoot中通过Canal实现MySQL与Redis的数据同步

mysql-bin.000003 是首次配置的,如果master节点重启了,这个文件会递增变为mysql-bin.000004,这时我们的从节点会自动连上这mysql-bin.000004。

使用Canal实现PHP应用程序与MySQL数据库的实时数据同步

Canal是阿里巴巴开源的一个数据同步工具,可实现MySQL数据库到其他数据源的实时同步,PHP应用程序中可轻松使用,提高系统的可靠性和实时性,提供了丰富的API和文档支持
2023-05-16

Canal进行MySQL到MySQL数据库全量+增量同步踩坑指南

背景最近工作中遇到一个迁移数据库的需求,需要将数据库从A服务器迁移至B服务器,为了尽量减少迁移导致的停机时间,考虑使用全量迁移+增量同步的方式,最终选择使用Canal作为迁移工具准备工作1. 数据库两台服务器的数据库都需要提前准备好
2023-10-03

实战!Spring Boot 整合 阿里开源中间件 Canal 实现数据增量同步!

数据同步一直是一个令人头疼的问题。在业务量小,场景不多,数据量不大的情况下我们可能会选择在项目中直接写一些定时任务手动处理数据,例如从多个表将数据查出来,再汇总处理,再插入到相应的地方。

编程热搜

目录