mysql增量同步到greenplum
采用工具:maxwell+Kafka+bireme
maxwell:maxwell能实时解析MySQL的binlog,并输出json格式的数据发送到Kafka(还支持其它的消息中间件),具体参见:maxwell官网
Kafka: 一种消息中间件,在该方案中主要用于消息中转,具体参见Kafka官网
bireme:支持Greenplum的数据增量同步工具,在写入Greenplum的过程中,由于采用Copy模式,所以性能较高,具体参见bireme官网
大致原理就是:利用maxwell把mysql binlog解析成json,然后用kafka创建topic,然后用bireme消费,从而达到增量,增量的前提是先把数据全量同步一次,然后再增量。
全量同步初始化个人推荐dbswitch工具作者项目地址,个人测试使用体验最佳,可以自动创建表结构,同步速度也很快。
操作步骤:
1.下载并搭建Kafka服务
2.下载并搭建maxwell服务,修改配置使其能够连接MySQL并能向kafka写入数据
3.下载并搭建bireme服务,修改配置使其能读取kafka的数据并能向Greenplum写入数据
kafka:
(1)下载安装:
wget http://mirrors.hust.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xzf kafka_2.12-2.5.0.tgz -C /usr/local
(2)配置server.properties,我的简单配置如下:
[root@szwpldb1080 config]# cat server.properties |grep -vE "^#|^$"
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=16
log.cleanup.policy=delete
log.segment.bytes=1073741824
log.retention.check.interval.ms=3000
delete.topic.enable = true
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
delete.topic.enable=true
group.initial.rebalance.delay.ms=0
advertised.host.name=172.18.1.150
(3)制作kafka启停脚本(提前安装好java):
#!/usr/bin/env bash
# chkconfig: 2345 20 80
#description: start and stop server
ZOOP_HOME=/usr/local/kafka_2.12-2.5.0/bin
JAVA_HOME=/usr/java/jdk1.8.0_221/
PATH=$JAVA_HOME/bin:$PATH
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
SVR_NAME=kafka
case $1 in
start)
echo "starting $ZOOP_HOME/$SVR_NAME ..."
$ZOOP_HOME/kafka-server-start.sh /usr/local/kafka_2.12-2.5.0/config/server.properties > /tmp/kafka.logs.out&
;;
stop)
echo "stopping $PRO_HOME/$SVR_NAME ..."
ps -ef|grep *.$SVR_NAME* |grep -v grep |awk "{print $2}" | sed -e "s/^/kill -9 /g" | sh -
;;
restart)
"$0" stop
sleep 3
"$0" start
;;
status)
ps -ef|grep *.$SVR_NAME*
;;
logs)
tail -f /tmp/zookeeper.logs.out
;;
*)
echo "Example: server-$SVR_NAME [start|stop|restart|status|logs]" ;;
esac
添加到 /etc/rc.d/init.d ,然后就可以直接service kafka xxx来管理,或者添加到systemd下面,确保服务正常启动安装zookerper,然后启动kafka。
我的zookeeper简单配置如下:
[root@szwpldb1080 config]# cat zookeeper.properties |grep -Ev "^$|^#"
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
zookeeper启停脚本
#!/usr/bin/env bash
# chkconfig: 2345 20 80
#description: start and stop server
ZOOP_HOME=/usr/local/kafka_2.12-2.5.0/bin
JAVA_HOME=/usr/java/jdk1.8.0_221/
PATH=$JAVA_HOME/bin:$PATH
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
SVR_NAME=zookeeper
case $1 in
start)
echo "starting $ZOOP_HOME/$SVR_NAME ..."
$ZOOP_HOME/zookeeper-server-start.sh /usr/local/kafka_2.12-2.5.0/config/zookeeper.properties > /tmp/zookeeper.logs.out&
;;
stop)
echo "stopping $PRO_HOME/$SVR_NAME ..."
ps -ef|grep *.$SVR_NAME* |grep -v grep |awk "{print $2}" | sed -e "s/^/kill -9 /g" | sh -
;;
restart)
"$0" stop
sleep 3
"$0" start
;;
status)
ps -ef|grep *.$SVR_NAME*
;;
logs)
tail -f /tmp/zookeeper.logs.out
;;
*)
echo "Example: server-$SVR_NAME [start|stop|restart|status|logs]" ;;
esac
(4)检查状态:
[root@szwpldb1080 config]# jps
1762 Kafka
18521 QuorumPeerMain
30383 Jps
maxwell:
(1)docker镜像下载
docker pull zendesk/maxwell
(2)在源端mysql建好用户设置好权限,测试maxwell:
docker run -ti --rm zendesk/maxwell bin/maxwell --user="xxxx" --password="xxxx" --host="x.x.x.x" --producer=stdout
看到日志输出正常,可以放后台运行:
docker run -d --rm zendesk/maxwell bin/maxwell --user="xx"
--password="xx" --host="x.x.x.x" --port=3306
--producer=kafka --kafka.bootstrap.servers="x.x.x.x:9092"
--kafka_topic=syncdb --log_level=debug --output_ddl
然后创建kafka topic:
bin/kafka-topics.sh --create --topic syncdb --zookeeper localhost:2181 --partitions 1 --replication-factor 2
topic名字与maxwell创建的topic一致,并且由于maxwell可以解析所有binlog,但是bireme工具只能同步dml,因此没有加 --output_ddl
具体maxwell参数用法可以参考: maxwell配置
检查kafka消费情况:
[root@szwpldb1080 bin]# ./kafka-topics.sh --zookeeper localhost:2181 --describe --topic syncdb
Topic:syncdb PartitionCount:1 ReplicationFactor:1 Configs:
Topic: syncdb Partition: 0 Leader: 0 Replicas: 0 Isr: 0
[root@szwpldb1080 bin]#./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic syncdb --from-beginning
看到kafka能接收maxwell产生的json文件,表示没问题。
bireme
安装配置都很简单,此处略过。
主要是修改以下2个配置文件。
config.properties
maxwell1.properties
然后监控 http://x.x.x.x:8080或者监控bireme日志就可以了。
总结:
只能同步DML语句,无法处理DDL,对比几款开源的工具已经同步方式,我觉得我这种是最舒服的。
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341