我的编程空间,编程开发者的网络收藏夹
学习永远不晚

使用datax实现增量同步mysql数据库数据(定时任务)

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

使用datax实现增量同步mysql数据库数据(定时任务)

使用datax来做数据全量同步很简单,增量同步该怎样做呢,接下来就一起试试吧

下载datax(前提CentOS已安装jdk等运行环境),解压(路径自定),使用centos7自带的python执行datax.py,运行自检

wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gztar -zxvf datax.tar.gz  && mv datax /usr/local/cd /usr/local/datax/bin/python datax.py  /usr/local/datax/job/job.json

如果报错参考这篇博客:https://www.cnblogs.com/juanxincai/p/16258154.html

下面是解决办法

cd /usr/local/datax/plugin/readerll -a[root@Data1 reader]# ll -atotal 76drwxr-xr-x 21 502 games 4096 Feb 19 21:05 .drwxr-xr-x  4 502 games   66 Feb 19 21:05 ..drwxr-xr-x  3 502 games  224 Feb 19 21:05 cassandrareader-rwxr-xr-x  1 502 games  212 Oct 12  2019 ._cassandrareader....删除._开头语文件rm -f ._*cd /usr/local/datax/plugin/writer/rm -f ._*

可以看到,自检成功

同步思路:

使用python查询数据源库表,查询到的最后一条数据的时间保存在一个txt文件中,下次执行再读取,加上cron任务从而定时同步时间间隔中的数据(增量同步)

想用pymysql就得升级python3,centos7自带python2,安装看这里:https://www.cnblogs.com/juanxincai/p/16280031.html

安装完成后再运行自检,执行报错看这里:https://www.cnblogs.com/juanxincai/p/16284779.html

接下来需要4个东西:

1,执行读取和写入的mysqltomysql.json,(我这里文件名叫new.json)里面有数据源库表的信息,读取的字段等设置,并且接收外部传入的两个时间参数(格式化为时间戳),路径为/usr/local/datax/job下,修改注意格式为标准json,格式化无问题再使用

{    "job": {        "content": [            {                "reader": {                    "name": "mysqlreader",                    "parameter": {                        "username": "用户名",                        "password": "密码",                        "where": "data_time >= FROM_UNIXTIME(${create_time}) and data_time  < FROM_UNIXTIME(${end_time})",                        "column": ["id","data_time","name","age","insert_time"                        ],                        "connection": [{    "table": [        "表名"    ],    "jdbcUrl": [        "jdbc:mysql://数据源IP:3306/数据库名?useUnicode=true&characterEncoding=utf8"    ]}                        ]                    }                },                "writer": {                    "name": "mysqlwriter",                    "parameter": {                        "writeMode": "update",                        "username": "用户名",                        "password": "密码",                        "column": ["id","data_time","name","age","insert_time"                        ],                        "connection": [{    "jdbcUrl": "jdbc:mysql://目标库IP:3306/数据库名?useUnicode=true&characterEncoding=utf8",    "table": [        "表名"    ]}                        ]                    }                }            }        ],        "setting": {            "speed": {                "channel": 6            }        }    }}

定时执行的python脚本,用于获取数据源库表最后一条数据时间并写入txt文件,执行datax.py运行上面的new.json(文件名:mysql2mysqlexecute.py)

#!/usr/bin/env python3# coding: utf-8import subprocess as spimport time,os,sysimport pymysqlimport pickleprint ("going to execute")configFilePath = sys.argv[1]logFilePath = sys.argv[2]lastDataTime=""def save_variable(v, filename):    f = open(filename, 'wb')    pickle.dump(v, f)    f.close()    return filenamedef load_variavle(filename):    ff = open(filename, 'rb')    r = pickle.load(ff,encoding ="UTF-8")    ff.close()    return rstartTime=load_variavle('/usr/local/datax/job/tempTime.txt')  #这个就是存放临时时间变量的txt文件,注意编码格式,读取起始时间def do_sql(sql):db = pymysql.connect(host = '数据源库IP',port = 3306,user = '用户名',passwd = '密码',db = '数据库名')#创建连接(连接数据库)cursor = db.cursor()  #创建游标cursor = db.cursor(cursor=pymysql.cursors.DictCursor)   #设置游标格式为字典格式,即取值时会以字典的形式呈现cursor.execute(sql) #执行sql语句rs=cursor.fetchall() #for r in rs: #print (r)content=rsdb.commit()  #提交,以保存执行结果cursor.close()   #关闭游标db.close()     #关闭连接x = rs[0]['dataTime']return x;print("startTime=",startTime) #输出格式化的同步开始日期startTimeArray = time.strptime(startTime, "%Y-%m-%d %H:%M:%S")startTimeStamp = int(time.mktime(startTimeArray))sql='SELECT CAST(data_time AS CHAR) as dataTime FROM 表名 ORDER BY data_time DESC LIMIT 1'lastDataTime = do_sql(sql)print("endTime=",lastDataTime)lastDataTimeArray = time.strptime(lastDataTime, "%Y-%m-%d %H:%M:%S")lastDataTimeTimeStamp = int(time.mktime(lastDataTimeArray))try:script2execute  = "/usr/bin/python3 /usr/local/datax/bin/datax.py %s -p \"-Dcreate_time=%s -Dend_time=%s\" >> %s"%(configFilePath,startTimeStamp,lastDataTimeTimeStamp,logFilePath)print("to be excute script:",script2execute)os.system(script2execute)#sp.run(script2execute)except IOError:print(IOError) print("script execute ending")save_variable(lastDataTime,'/usr/local/datax/job/tempTime.txt') #保存临时时间变量作为下次的开始时间print("ending---")

定时执行同步任务的sh脚本:(文件名:timeMission.sh),这里可以看到执行的时候将new.json文件位置传入,还有日志的路径

#! /bin/bashsource /etc/profile/usr/bin/python3 /usr/local/datax/job/mysql2mysqlexecute.py  '/usr/local/datax/job/new.json'  '/usr/local/datax/job/test_job.log'   '/usr/loal/datax/job/test_job.record'

可以看到上面的读取与存放临时时间变量的文件名叫做:tempTime.txt ,自己新建就好,注意路径和编码格式

既然用到了sh,就记得给脚本赋予执行权限

chmod +x ./xxx.sh

接下来就可以编写定时任务,这里我们使用corntab,有问题参考这篇:https://www.cnblogs.com/juanxincai/p/15852374.html

crontab -e
SHELL=/bin/bash */5 * * * *  /usr/local/datax/job/timeMission.sh

退出保存 :wq 加上这两行,代表每5分钟执行一次timeMission.sh,也就是五分钟同步一次

crontab -l

可以看到,我们的定时任务已经写入

systemctl reload crond.servicesystemctl restart crond.service

重启和重新加载cron服务,查看任务执行日志输出

tail -f /var/spool/mail/root

再看一下datax的运行日志

 tail -200f /usr/local/datax/job/test_job.log 

可以看到,数据已同步,具体还可以优化,请参考datax官方文档,搭建运行中间不要怕出问题,大胆尝试细心排错,有可能格式,执行权限,文件编码都会造成执行不成功,祝大家一次同步成功

来源地址:https://blog.csdn.net/zxj19880502/article/details/129435341

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

使用datax实现增量同步mysql数据库数据(定时任务)

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

DataX安装使用实现MySQL到MySQL数据同步

DataX安装使用实现MySQL到MySQL数据同步1.前置条件:1.1jdk安装jdk安装前往官网,这里我安装jdk-8u261解压sudo mkdir -p /opt/moudlesudo tar -zxvf jdk-8u261-linux-x64.tar
DataX安装使用实现MySQL到MySQL数据同步
2018-05-28

SpringBoot定时任务实现数据同步的方法

业务需求是,通过中台调用api接口获得,设备数据,要求现实设备数据的同步,这篇文章主要介绍了SpringBoot定时任务实现数据同步的方法,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
2022-11-13

定时任务备份mysql数据库,同时备份多个数据库

编写备份数据库的shell脚本创建脚本mkdir -p /root/mysql_dump/datacd /root/mysql_dumpvim mysql_back.sh脚本内容如下#!/bin/sh# File: /root/mysql_dump/mysql
2018-09-25

使用Kettle定时从数据库A同步数据到数据库B

一、需求背景 由于项目场景原因,需要将A库(MySQL)中的表a、表b、表c中的数据定时T+1 增量的同步到B库(MySQL)。这里说明一下,不是数据库的主从备份,就是普通的数据同步。经过技术调研,发现Kettle挺合适的,原因如下: Ke
2023-08-24

python实现MySQL指定表增量同步数据到clickhouse的脚本

python实现MySQL指定表增量同步数据到clickhouse,脚本如下:#!/usr/bin/env python3 # _*_ coding:utf8 _*_from pymysqlreplication import BinLog
2022-05-14

使用Canal实现PHP应用程序与MySQL数据库的实时数据同步

Canal是阿里巴巴开源的一个数据同步工具,可实现MySQL数据库到其他数据源的实时同步,PHP应用程序中可轻松使用,提高系统的可靠性和实时性,提供了丰富的API和文档支持
2023-05-16

MySQL特定表全量、增量数据同步到消息队列怎么实现

本篇内容主要讲解“MySQL特定表全量、增量数据同步到消息队列怎么实现”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“MySQL特定表全量、增量数据同步到消息队列怎么实现”吧!1、原始需求既要同步
2023-06-21

shell脚本实现数据库表增量同步的流程

需求: 每天定时将 源数据库 study_plan 库的 zxxt_class 表 增量同步到 目标数据库 axt_statistics 库的 zxxt_class 表中 前提条件: 两个库中的 zxxt_class 表结构一致
2022-06-04

如何使用Flink CDC实现 Oracle数据库数据同步

目录前言一、开启归档日志二、创建flinkcdc专属用户2.1 对于oracle 非CDB数据库,执行如下sql2.2 对于Oracle CDB数据库,执行如下sql三、指定oracle表、库级启用四、使用flink-connector-o
如何使用Flink CDC实现 Oracle数据库数据同步
2024-08-21

利用python怎么将MySQL指定的表增量同步数据到clickhouse脚本

本篇文章为大家展示了利用python怎么将MySQL指定的表增量同步数据到clickhouse脚本,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。python实现MySQL指定表增量同步数据到clic
2023-06-06

编程热搜

目录