我的编程空间,编程开发者的网络收藏夹
学习永远不晚

MySQL特定表全量、增量数据同步到消息队列怎么实现

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

MySQL特定表全量、增量数据同步到消息队列怎么实现

本篇内容主要讲解“MySQL特定表全量、增量数据同步到消息队列怎么实现”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“MySQL特定表全量、增量数据同步到消息队列怎么实现”吧!

    1、原始需求

    既要同步原始全量数据,也要实时同步MySQL特定库的特定表增量数据,同时对应的修改、删除也要对应。

    数据同步不能有侵入性:不能更改业务程序,并且不能对业务侧有太大性能压力。

    应用场景:数据ETL同步、降低业务服务器压力。

    2、解决方案

    MySQL特定表全量、增量数据同步到消息队列怎么实现

    3、canal介绍、安装

    canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。

    工作原理:mysql主备复制实现

    MySQL特定表全量、增量数据同步到消息队列怎么实现

    从上层来看,复制分成三步:

    1. master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);

    2. slave将master的binary log events拷贝到它的中继日志(relay log);

    3. slave重做中继日志中的事件,将改变反映它自己的数据。

    canal的工作原理

    MySQL特定表全量、增量数据同步到消息队列怎么实现

    原理相对比较简单:

    1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议

    2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)

    3. canal解析binary log对象(原始为byte流)

    架构

    MySQL特定表全量、增量数据同步到消息队列怎么实现

    说明:

    • server代表一个canal运行实例,对应于一个jvm

    • instance对应于一个数据队列 (1个server对应1..n个instance)

    instance模块:

    • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)

    • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)

    • eventStore (数据存储)

    • metaManager (增量订阅&消费信息管理器)

    安装

    mysql、kafka环境准备

    canal下载:wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

    解压:tar -zxvf canal.deployer-1.1.3.tar.gz

    对目录conf里文件参数配置

    对canal.properties配置:

    MySQL特定表全量、增量数据同步到消息队列怎么实现

    MySQL特定表全量、增量数据同步到消息队列怎么实现

    进入conf/example里,对instance.properties配置:

    MySQL特定表全量、增量数据同步到消息队列怎么实现

    MySQL特定表全量、增量数据同步到消息队列怎么实现

    启动:bin/startup.sh

    日志查看:

    MySQL特定表全量、增量数据同步到消息队列怎么实现

    4、验证

    开发对应的kafka消费者

    package org.kafka;import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;public class KafkaConsumerTest implements Runnable {    private final KafkaConsumer<String, String> consumer;    private ConsumerRecords<String, String> msgList;    private final String topic;    private static final String GROUPID = "groupA";    public KafkaConsumerTest(String topicName) {        Properties props = new Properties();        props.put("bootstrap.servers", "192.168.7.193:9092");        props.put("group.id", GROUPID);        props.put("enable.auto.commit", "true");        props.put("auto.commit.interval.ms", "1000");        props.put("session.timeout.ms", "30000");        props.put("auto.offset.reset", "latest");        props.put("key.deserializer", StringDeserializer.class.getName());        props.put("value.deserializer", StringDeserializer.class.getName());        this.consumer = new KafkaConsumer<String, String>(props);        this.topic = topicName;        this.consumer.subscribe(Arrays.asList(topic));    }    @Override    public void run() {        int messageNo = 1;        System.out.println("---------开始消费---------");        try {            for (; ; ) {                msgList = consumer.poll(1000);                if (null != msgList && msgList.count() > 0) {                    for (ConsumerRecord<String, String> record : msgList) {                        //消费100条就打印 ,但打印的数据不一定是这个规律的                            System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());//                            String v = decodeUnicode(record.value());//                            System.out.println(v);                        //当消费了1000条就退出                        if (messageNo % 1000 == 0) {                            break;                        }                        messageNo++;                    }                } else {                    Thread.sleep(11);                }            }        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            consumer.close();        }    }    public static void main(String args[]) {        KafkaConsumerTest test1 = new KafkaConsumerTest("sample-data");        Thread thread1 = new Thread(test1);        thread1.start();    }        public static String gbEncoding(final String gbString) {        char[] utfBytes = gbString.toCharArray();        String unicodeBytes = "";        for (int i = 0; i < utfBytes.length; i++) {            String hexB = Integer.toHexString(utfBytes[i]);            if (hexB.length() <= 2) {                hexB = "00" + hexB;            }            unicodeBytes = unicodeBytes + "\\u" + hexB;        }        return unicodeBytes;    }        public static String decodeUnicode(final String dataStr) {        int start = 0;        int end = 0;        final StringBuffer buffer = new StringBuffer();        while (start > -1) {            end = dataStr.indexOf("\\u", start + 2);            String charStr = "";            if (end == -1) {                charStr = dataStr.substring(start + 2, dataStr.length());            } else {                charStr = dataStr.substring(start + 2, end);            }            char letter = (char) Integer.parseInt(charStr, 16); // 16进制parse整形字符串。            buffer.append(new Character(letter).toString());            start = end;        }        return buffer.toString();    }}

    对表bak1进行增加数据

    CREATE TABLE `bak1` (  `vin` varchar(20) NOT NULL,  `p1` double DEFAULT NULL,  `p2` double DEFAULT NULL,  `p3` double DEFAULT NULL,  `p4` double DEFAULT NULL,  `p5` double DEFAULT NULL,  `p6` double DEFAULT NULL,  `p7` double DEFAULT NULL,  `p8` double DEFAULT NULL,  `p9` double DEFAULT NULL,  `p0` double DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4show create table bak1;insert into bak1 select '李雷abcv',  `p1` ,  `p2` ,  `p3` ,  `p4` ,  `p5` ,  `p6` ,  `p7` ,  `p8` ,  `p9` ,  `p0`  from moci limit 10

    查看输出结果:

    MySQL特定表全量、增量数据同步到消息队列怎么实现

    到此,相信大家对“MySQL特定表全量、增量数据同步到消息队列怎么实现”有了更深的了解,不妨来实际操作一番吧!这里是编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

    免责声明:

    ① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

    ② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

    MySQL特定表全量、增量数据同步到消息队列怎么实现

    下载Word文档到电脑,方便收藏和打印~

    下载Word文档

    猜你喜欢

    MySQL特定表全量、增量数据同步到消息队列怎么实现

    本篇内容主要讲解“MySQL特定表全量、增量数据同步到消息队列怎么实现”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“MySQL特定表全量、增量数据同步到消息队列怎么实现”吧!1、原始需求既要同步
    2023-06-21

    python实现MySQL指定表增量同步数据到clickhouse的脚本

    python实现MySQL指定表增量同步数据到clickhouse,脚本如下:#!/usr/bin/env python3 # _*_ coding:utf8 _*_from pymysqlreplication import BinLog
    2022-05-14

    利用python怎么将MySQL指定的表增量同步数据到clickhouse脚本

    本篇文章为大家展示了利用python怎么将MySQL指定的表增量同步数据到clickhouse脚本,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。python实现MySQL指定表增量同步数据到clic
    2023-06-06

    编程热搜

    目录