Canal——数据同步
1.数据同步到数据库:
- 在介绍方案2之前我们先来介绍一下MySQL复制的原理,如下图所示:
-
- 主服务器操作数据,并将数据写入Bin log
- 从服务器调用I/O线程读取主服务器的Bin log,并且写入到自己的Relay log中,再调用SQL线程从Relay log中解析数据,从而同步到自己的数据库中
- 方案2就是:
-
- 上面MySQL的整个复制流程可以总结为一句话,那就是:从服务器读取主服务器Bin log中的数据,从而同步到自己的数据库中
- 我们方案2也是如此,就是在概念上把主服务器改为MySQL,把从服务器改为Redis而已(如下图所示),当MySQL中有数据写入时,我们就解析MySQL的Bin log,然后将解析出来的数据写入到Redis中,从而达到同步的效果
- 例如下面是一个云数据库实例分析:
-
- 云数据库与本地数据库是主从关系。云数据库作为主数据库主要提供写,本地数据库作为从数据库从主数据库中读取数据
- 本地数据库读取到数据之后,解析Bin log,然后将数据写入写入同步到Redis中,然后客户端从Redis读数据
- 这个技术方案的难点就在于: 如何解析MySQL的Bin Log。但是这需要对binlog文件以及MySQL有非常深入的理解,同时由于binlog存在Statement/Row/Mixedlevel多种形式,分析binlog实现同步的工作量是非常大的
Canal开源技术
- canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)
- 开源参考地址有:https://github.com/liukelin/canal_mysql_nosql_sync
- 工作原理(模仿MySQL复制):
-
- canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
- mysql master收到dump请求,开始推送binary log给slave(也就是canal)
- canal解析binary log对象(原始为byte流)
- 架构:
-
- eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
- eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
- eventStore (数据存储)
- metaManager (增量订阅&消费信息管理器)
- server代表一个canal运行实例,对应于一个jvm
- instance对应于一个数据队列 (1个server对应1..n个instance)
- instance模块:
- 大致的解析过程如下:
-
- parse解析MySQL的Bin log,然后将数据放入到sink中
- sink对数据进行过滤,加工,分发
- store从sink中读取解析好的数据存储起来
- 然后自己用设计代码将store中的数据同步写入Redis中就可以了
- 其中parse/sink是框架封装好的,我们做的是store的数据读取那一步
- 更多关于Cancl可以百度搜索
- 下面是运行拓扑图
- MySQL表的同步,采用责任链模式,每张表对应一个Filter 。例如zvsync中要用到的类设计如下:
- 下面是具体化的zvsync中要用到的类 ,每当新增或者删除表时,直接进行增删就可以了
Canal的架构设计
Canal 伪装成 MySQL 从节点,mySQL master 会推送 binary log 给canal,canal读取 MySQL binlog的变更信息并生成消息,客户端订阅这些数据变更消息,处理并存储。只要开发一个 Canal客户端就可以解析出MySQL的操作,再将这些数据发送到大数据流计算处理引擎,即可以实现对 MySQL 实时处理。
我们一般使用canal时,只需要引入一个客户端,比如java类似这样:
然后就可以订阅binlog消息了。
另外canal也支持直接把binlog消息发送到mq,这样对多语言的支持更好一些。
canal自身帮我们做了很多事,这样我们自己写的客户端才能更简单,更专注于业务。下面就来看看canal内部的架构。
说明:
server代表一个canal运行实例,对应于一个jvm
instance对应于一个数据队列
核心是instance模块,它包含:
- eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
parser模块
parser模块用来订阅binlog事件,然后通过sink投递到store。parser模块底层依赖dbsync、driver模块。
eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
- eventStore (数据存储)
- metaManager (增量订阅&消费信息管理器)
meta模块
核心接口为CanalMetaManager,实现了订阅&消费的机制,主要用于记录canal消费到的mysql binlog的位置。CanalMetaManager接口有几个实现类:
FileMixedMetaManager
MemoryMetaManager
MixedMetaManager
PeriodMixedMetaManager
ZooKeeperMetaManager
这些实现类之间有些会持有其它实现的引用来装饰自己的功能(装饰器模式),
CanalServer的两种实现方式
这两个实现代表了canal的两种应用模式,CanalServerWithNetty在canal独立部署场景发挥作用,开发者只需要实现cient,不同的应用通过canal client与canal server进行通信,canal client的请求统一由CanalServerWithNetty接受进行处理。
而通过CanalServerWithEmbeded,可以不需要独立部署canal,而是把canal嵌入到我们自己的服务里。但是这种对开发者的要求就比较高。
下面的图表示二者的关系,
CanalServerWithEmbedded 内部管理所有的CanalInstance,通过 Client 的信息(destination),找到 Client 订阅的 CanalInstance,然后调用 CanalInstance 内部的各个模块进行处理(4个模块 )
CanalInstance模块解析
1.CanalServerWithEmbedded 内部管理所有的CanalInstance,通过 Client 的信息(destination),找到 Client 订阅的 CanalInstance,然后调用 CanalInstance 内部的各个模块进行处理
- instance代表了一个实际运行的数据队列,包括了EventPaser,EventSink,EventStore等组件
- 这幅图我们可以看出instance是怎么生成的。
CanalInstanceGenerator相当于一个工厂类,通过 destination 产生特定的 CanalInstance,它有两个实现:
ManagerCanalInstanceGenerator类,manager方式: 和你自己的内部web console/manager系统进行对接。(目前主要是公司内部使用)
SpringCanalInstanceGenerator类,spring方式:基于spring xml + properties进行定义,构建spring配置
- 具体使用哪个,是通过配置的 canal.properties文件
canal.instance.global.mode = spring
- 先来看下spring的版本实现,
- 源码里面给了几个默认的模板选择实现
- 然后部署的时候,我们可以通过在canal.properties配置文件中指定使用哪个文件:
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
- 几种实现不不同:
- spring/memory-instance.xml
- spring/file-instance.xml
- spring/default-instance.xml
- spring/group-instance.xml
这几个文件的主要区别是,metaManager 和eventParser 这两个配置有所不同,可能在内存、文件或zk进行存储。
- generate方法返回的是CanalInstanceWithSpring这个实现类,它继承自AbstractCanalInstance,并且实现了CanalInstance。这个类的实现只有几十行,之所以这么少是因为大部分的逻辑都已经通过spring的配置文件实现了,如下
- 具体类之间关系
- start方法和stop方法没什么可讲的,就是启停instance内部的组件。
- beforeStartEventParser和afterStartEventParser是eventParser启动的前置和后置操作
前者调用了startEventParserInternal,后者调用了stopEventParserInternal,
就是分别负责了CanalLogPositionManager和CanalHAController的启动停止工作。
- CanalLogPositionManager记录binlog最后一次解析成功位置,有不同的实现,可以保存在内存,zk等存在介质里。mysql在主从同步过程中,slave自己需要维护binlog的消费进度信息。而canal伪装成slave,因此也要维护这样的信息。
- CanalHAController主要是通过失败检测, 控制 EventParser 的链接主机管理,判断当前该链接哪个mysql数据库。它只有一个实现类HeartBeatHAController
- 失败转换的逻辑也很简单,定时发送心跳语句到当前链接的数据库,超过一定次数检测失败时,尝试切换到备机
- 心跳的逻辑在 com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.MysqlDetectingTimeTask 实现,是个定时器。
总结
总体来看,CanalInstance模块本身没有什么特别复杂的逻辑,它的核心处理都在parser、sink、store、metamanager等内部组件里
CANAL官方文档
基本说明
canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:
- kafka: GitHub - apache/kafka: Mirror of Apache Kafka
- RocketMQ : GitHub - apache/rocketmq: Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
- RabbitMQ : GitHub - rabbitmq/rabbitmq-server: Open source RabbitMQ: core server and tier 1 (built-in) plugins
- pulsarmq : https://github.com/apache/pulsar
环境版本
- 操作系统:CentOS release 6.6 (Final)
- java版本: jdk1.8
- canal 版本: 请下载最新的安装包,本文以当前v1.1.1 的canal.deployer-1.1.1.tar.gz为例
- MySQL版本 :5.7.18
- 注意 : 关闭所有机器的防火墙,同时注意启动可以相互telnet ip 端口
一、 安装zookeeper
二、安装MQ
- Kafka安装参考:Kafka QuickStart
- RocketMQ安装参考:RocketMQ QuickStart
- RabbitMQ安装参考:RabbitMQ QuickStart
- PulsarMQ安装参考:PulsarMQ QuickStart
三、 安装canal.server
3.1 下载压缩包
到官网地址(release)下载最新压缩包,请下载 canal.deployer-latest.tar.gz
3.2 将canal.deployer 复制到固定目录并解压
mkdir -p /usr/local/canal cp canal.deployer-1.1.6.tar.gz /usr/local/canal tar -zxvf canal.deployer-1.1.6.tar.gz
3.3 配置修改参数
a. 修改instance 配置文件 vi conf/example/instance.properties
# 按需修改成自己的数据库信息 ################################################# ... canal.instance.master.address=192.168.1.20:3306 # username/password,数据库的用户名和密码 ... canal.instance.dbUsername = canal canal.instance.dbPassword = canal ... # mq config canal.mq.topic=example # 针对库名或者表名发送动态topic #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #库名.表名: 唯一主键,多个表之间用逗号分隔 #canal.mq.partitionHash=mytest.person:id,mytest.role:id #################################################
对应ip 地址的MySQL 数据库需进行相关初始化与设置, 可参考 Canal QuickStart
b. 修改canal 配置文件vi /usr/local/canal/conf/canal.properties
# ... # 可选项: tcp(默认), kafka,RocketMQ,rabbitmq,pulsarmq canal.serverMode = kafka # ... # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下) canal.mq.canalBatchSize = 50 # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时 canal.mq.canalGetTimeout = 100 # 是否为flat json格式对象 canal.mq.flatMessage = false
mq相关参数说明 (<=1.1.4版本)
参数名 | 参数说明 | 默认值 |
canal.mq.servers | kafka为bootstrap.servers | 127.0.0.1:6667 |
canal.mq.retries | 发送失败重试次数 | 0 |
canal.mq.batchSize | kafka为ProducerConfig.BATCH_SIZE_CONFIG | 16384 |
canal.mq.maxRequestSize | kafka为ProducerConfig.MAX_REQUEST_SIZE_CONFIG | 1048576 |
canal.mq.lingerMs | kafka为ProducerConfig.LINGER_MS_CONFIG , 如果是flatMessage格式建议将该值调大, 如: 200 | 1 |
canal.mq.bufferMemory | kafka为ProducerConfig.BUFFER_MEMORY_CONFIG | 33554432 |
canal.mq.acks | kafka为ProducerConfig.ACKS_CONFIG | all |
canal.mq.kafka.kerberos.enable | kafka为ProducerConfig.ACKS_CONFIG | false |
canal.mq.kafka.kerberos.krb5FilePath | kafka kerberos认证 | ../conf/kerberos/krb5.conf |
canal.mq.kafka.kerberos.jaasFilePath | kafka kerberos认证 | ../conf/kerberos/jaas.conf |
canal.mq.producerGroup | kafka无意义 | Canal-Producer |
canal.mq.accessChannel | kafka无意义 | local |
--- | --- | --- |
canal.mq.vhost= | rabbitMQ配置 | 无 |
canal.mq.exchange= | rabbitMQ配置 | 无 |
canal.mq.username= | rabbitMQ配置 | 无 |
canal.mq.password= | rabbitMQ配置 | 无 |
canal.mq.aliyunuid= | rabbitMQ配置 | 无 |
--- | --- | --- |
canal.mq.canalBatchSize | 获取canal数据的批次大小 | 50 |
canal.mq.canalGetTimeout | 获取canal数据的超时时间 | 100 |
canal.mq.parallelThreadSize | mq数据转换并行处理的并发度 | 8 |
canal.mq.flatMessage | 是否为json格式 | false |
--- | --- | --- |
canal.mq.topic | mq里的topic名 | 无 |
canal.mq.dynamicTopic | mq里的动态topic规则, 1.1.3版本支持 | 无 |
canal.mq.partition | 单队列模式的分区下标, | 1 |
canal.mq.partitionsNum | 散列模式的分区数 | 无 |
canal.mq.partitionHash | 散列规则定义 | 无 |
mq相关参数说明 (>=1.1.5版本)
在1.1.5版本开始,引入了MQ Connector设计,因此参数配置做了部分调整
参数名 | 参数说明 | 默认值 |
canal.aliyun.accessKey | 阿里云ak | 无 |
canal.aliyun.secretKey | 阿里云sk | 无 |
canal.aliyun.uid | 阿里云uid | 无 |
canal.mq.flatMessage | 是否为json格式 | false |
canal.mq.canalBatchSize | 获取canal数据的批次大小 | 50 |
canal.mq.canalGetTimeout | 获取canal数据的超时时间 | 100 |
canal.mq.accessChannel = local | 是否为阿里云模式,可选值local/cloud | local |
canal.mq.database.hash | 是否开启database混淆hash,确保不同库的数据可以均匀分散,如果关闭可以确保只按照业务字段做MQ分区计算 | true |
canal.mq.send.thread.size | MQ消息发送并行度 | 30 |
canal.mq.build.thread.size | MQ消息构建并行度 | 8 |
------ | ----------- | ------- |
kafka.bootstrap.servers | kafka服务端地址 | 127.0.0.1:9092 |
kafka.acks | kafka为ProducerConfig.ACKS_CONFIG | all |
kafka.compression.type | 压缩类型 | none |
kafka.batch.size | kafka为ProducerConfig.BATCH_SIZE_CONFIG | 16384 |
kafka.linger.ms | kafka为ProducerConfig.LINGER_MS_CONFIG , 如果是flatMessage格式建议将该值调大, 如: 200 | 1 |
kafka.max.request.size | kafka为ProducerConfig.MAX_REQUEST_SIZE_CONFIG | 1048576 |
kafka.buffer.memory | kafka为ProducerConfig.BUFFER_MEMORY_CONFIG | 33554432 |
kafka.max.in.flight.requests.per.connection | kafka为ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION | 1 |
kafka.retries | 发送失败重试次数 | 0 |
kafka.kerberos.enable | kerberos认证 | false |
kafka.kerberos.krb5.file | kerberos认证 | ../conf/kerberos/krb5.conf |
kafka.kerberos.jaas.file | kerberos认证 | ../conf/kerberos/jaas.conf |
------ | ----------- | ------- |
rocketmq.producer.group | rocketMQ为ProducerGroup名 | test |
rocketmq.enable.message.trace | 是否开启message trace | false |
rocketmq.customized.trace.topic | message trace的topic | 无 |
rocketmq.namespace | rocketmq的namespace | 无 |
rocketmq.namesrv.addr | rocketmq的namesrv地址 | 127.0.0.1:9876 |
rocketmq.retry.times.when.send.failed | 重试次数 | 0 |
rocketmq.vip.channel.enabled | rocketmq是否开启vip channel | false |
rocketmq.tag | rocketmq的tag配置 | 空值 |
--- | --- | --- |
rabbitmq.host | rabbitMQ配置 | 无 |
rabbitmq.virtual.host | rabbitMQ配置 | 无 |
rabbitmq.exchange | rabbitMQ配置 | 无 |
rabbitmq.username | rabbitMQ配置 | 无 |
rabbitmq.password | rabbitMQ配置 | 无 |
rabbitmq.deliveryMode | rabbitMQ配置 | 无 |
--- | --- | --- |
pulsarmq.serverUrl | pulsarmq配置 | 无 |
pulsarmq.roleToken | pulsarmq配置 | 无 |
pulsarmq.topicTenantPrefix | pulsarmq配置 | 无 |
--- | --- | --- |
canal.mq.topic | mq里的topic名 | 无 |
canal.mq.dynamicTopic | mq里的动态topic规则, 1.1.3版本支持 | 无 |
canal.mq.partition | 单队列模式的分区下标, | 1 |
canal.mq.enableDynamicQueuePartition | 动态获取MQ服务端的分区数,如果设置为true之后会自动根据topic获取分区数替换canal.mq.partitionsNum的定义,目前主要适用于RocketMQ | false |
canal.mq.partitionsNum | 散列模式的分区数 | 无 |
canal.mq.dynamicTopicPartitionNum | mq里的动态队列分区数,比如针对不同topic配置不同partitionsNum | 无 |
canal.mq.partitionHash | 散列规则定义 | 无 |
canal.mq.dynamicTopic 表达式说明
canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔
- 例子1:test\\.test 指定匹配的单表,发送到以test_test为名字的topic上
- 例子2:.*\\..* 匹配所有表,则每个表都会发送到各自表名的topic上
- 例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
- 例子4:test\\..* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
- 例子5:test,test1\\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值
为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table
- 例子1: test:test\\.test 指定匹配的单表,发送到以test为名字的topic上
- 例子2: test:.*\\..* 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下
- 例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下
- 例子4:testA:test\\..* 指定匹配的表达式,针对匹配的表会发送到testA的topic下
- 例子5:test0:test,test1:test1\\.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1\\.test1的表发送到对应的test1的topic下,其余的表发送到默认的canal.mq.topic值
大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力
canal.mq.partitionHash 表达式说明
canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔
- 例子1:test\\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
- 例子2:.*\\..*:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
- 例子3:.*\\..*:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
- 例子4: 匹配规则啥都不写,则默认发到0这个partition上
- 例子5:.*\\..* ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
-
- 按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
- 例子6: test\\.test:id,.\\..* , 针对test的表按照id散列,其余的表按照table散列
注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)
其他详细参数可参考Canal AdminGuide
mq顺序性问题
binlog本身是有序的,写入到mq之后如何保障顺序是很多人会比较关注,在issue里也有非常多人咨询了类似的问题,这里做一个统一的解答
- canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择
- canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区
- canal.mq.dynamicTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默认topic name
- canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做到按表级做分区、pk级做分区等
- canal的消费顺序性,主要取决于描述2中的路由选择,举例说明:
- 单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢,单分区的性能写入大概在2~3k的TPS
- 多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题
- 单topic、多topic的多分区,如果用户选择的是指定table的方式,那和第二部分一样,保障的是表级别的顺序性(存在热点表写入分区的性能问题),如果用户选择的是指定pk hash的方式,那只能保障的是一个pk的多次binlog顺序性 ** pk hash的方式需要业务权衡,这里性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产生业务处理错乱的情况. 如果有pk变更,pk变更前和变更后的值会落在不同的分区里,业务消费就会有先后顺序的问题,需要注意
MQ发送性能数据
1.1.5版本可以在5k~50k左右,具体可参考:Canal-MQ-Performance
阿里云RocketMQ对接参数
# 配置ak/sk canal.aliyun.accessKey = XXX canal.aliyun.secretKey = XXX # 配置topic canal.mq.accessChannel = cloud canal.mq.servers = 内网接入点 canal.mq.producerGroup = GID_**group(在后台创建) canal.mq.namespace = rocketmq实例id canal.mq.topic=(在后台创建)
kafka ssl配置参数
# canal.properties配置文件 kafka.kerberos.enable = true kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf" kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
3.4 启动
cd /usr/local/canal/ sh bin/startup.sh
3.5 查看日志
a.查看 logs/canal/canal.log
vi logs/canal/canal.log
b. 查看instance的日志:
vi logs/example/example.log
3.6 关闭
cd /usr/local/canal/ sh bin/stop.sh
3.7 MQ数据消费
canal.client下有对应的MQ数据消费的样例工程,包含数据编解码的功能
- kafka模式: com.alibaba.otter.canal.example.kafka.CanalKafkaClientExample
- rocketMQ模式: com.alibaba.otter.canal.example.rocketmq.CanalRocketMQClientExample
- rocketMQ模式: com.alibaba.otter.canal.example.rocketmq.CanalRocketMQClientExample
Footer
© 2023 GitHub, Inc.
Footer navigation
Canal部分源码
1.通过 destination 产生特定的 CanalInstance
来源地址:https://blog.csdn.net/weixin_43241803/article/details/129562160
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341