我的编程空间,编程开发者的网络收藏夹
学习永远不晚

FlinkCDC 实时监控 MySQL

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

FlinkCDC 实时监控 MySQL

通过 FlinkCDC 实现 MySQL 数据库、表的实时变化监控,这里只把变化打印了出来,后面会实现如何再写入其他 MySQL 库中;

1、开启 MySQL 的 binlog

在 my.cnf 中开启 binlog,我这里指定了 test 库,然后重启 MySQL

server.id=1log-bin=mysql-binbinlog-do-db=test

2、在 MySQL 中创建测试库和表

mysql> create database test;mysql> create table user_info(id int unsigned not null auto_increment primary key, username varchar(60), sex tinyint(1), nickname varchar(60), addr varchar(255))ENGINE=InnoDB default charset=utf8mb4;

3、Flink 代码

在 IDEA 中新建工程 flinkcdc

pom.xml

    4.0.0    com.zsoft.flinkcdc    flinkcdc    1.0-SNAPSHOT            8        8        1.13.1                                    org.apache.flink            flink-java            ${flink.version}                            org.apache.flink            flink-streaming-java_2.12            ${flink.version}                            org.apache.flink            flink-clients_2.12            ${flink.version}                            org.apache.hadoop            hadoop-client            3.1.3                            mysql            mysql-connector-java            8.0.22                            com.alibaba.ververica            flink-connector-mysql-cdc            1.4.0                            com.alibaba            fastjson            1.2.75                                                    org.apache.maven.plugins                maven-assembly-plugin                3.0.0                                                            jar-with-dependencies                                                                                                make-assembly                        package                        single                                                                                    

resources/log4j.properties

log4j.rootLogger=warn,stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.target=System.outlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

反序列化类:

com/zsoft/flinkcdc/MyDeserializationSchema.java

package com.zsoft.flinkcdc;import com.alibaba.fastjson.JSONObject;import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;import io.debezium.data.Envelope;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.util.Collector;import org.apache.kafka.connect.data.Field;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.source.SourceRecord;public class MyDeserializationSchema implements DebeziumDeserializationSchema {    @Override    public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {        Struct valueStruct = (Struct) sourceRecord.value();        Struct sourceStruct = valueStruct.getStruct("source");        // 获取数据库的名称        String database = sourceStruct.getString("db");        // 获取表名        String table = sourceStruct.getString("table");        // 获取类型( c -> insert, u -> update)        String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();        if(type.equals("create")){            type = "insert";        }        JSONObject jsonObj = new JSONObject();        jsonObj.put("database",database);        jsonObj.put("table", table);        jsonObj.put("type", type);        // 获取数据 data        Struct afterStruct = valueStruct.getStruct("after");        JSONObject dataJsonObj = new JSONObject();        if(afterStruct != null) {            for(Field field : afterStruct.schema().fields()) {                String fieldName = field.name();                Object fieldValue = afterStruct.get(field);                dataJsonObj.put(fieldName, fieldValue);            }        }        jsonObj.put("data", dataJsonObj);        collector.collect(jsonObj.toJSONString());    }    @Override    public TypeInformation getProducedType() {        return TypeInformation.of(String.class);    }}

主类:

com/zsoft/flinkcdc/FlinkCdcDataStream.java

package com.zsoft.flinkcdc;import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Properties;public class FlinkCdcDataStream {    public static void main(String[] args) throws Exception {        // TODO 1. 准备流处理环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        // TODO 2. 开启检查点        // 2.1 开启 Checkpoint        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);        // 2.2 设置超时时间        env.getCheckpointConfig().setCheckpointTimeout(60000);        // 2.3 指定从 CK 自动重启策略        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 6000L));        // 2.4 设置任务关闭时候保留最后一次 CK 数据        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);        // 2.5 设置状态后端        env.setStateBackend(new FsStateBackend("hdfs://s1:8020/flinkCDC_DS"));        // 2.6 设置访问 HDFS 的用户名        System.setProperty("HADOOP_USER_NAME", "hadoop");        // TODO 3. 创建 Flink-MySQL-CDC 的 Source        Properties props = new Properties();        props.setProperty("scan.startup.mode", "initial");        SourceFunction sourceFunction = MySQLSource.builder()                .hostname("s1")                .port(3306)                .username("root")                .password("123456")                .databaseList("test")                .tableList("test.user_info")                .startupOptions(StartupOptions.earliest())                .debeziumProperties(props)                .deserializer(new MyDeserializationSchema())                .build();        // TODO 4. 使用 CDC Source 从 MySQL 读取数据        DataStreamSource mysqlDS = env.addSource(sourceFunction).setParallelism(1);        // TODO 5. 打印输出        mysqlDS.print();        // TODO 6. 执行任务        env.execute();    }}

4、打包运行

在 IDEA 中打包项目 package

将生成的 flinkcdc-1.0-SNAPSHOT-jar-with-dependencies.jar 通过 Flink 的 webUI 上传

在 Flink 的 WebUI 中上传 jar 包

Submit New Job 页面点击 + Add New 按钮

上传后的 jar 包下填入:

  • Entry Class:com.zsoft.flinkcdc.FlinkCdcDataStream
  • Parallelism:1
  • Program Arguments:
  • Savepoint Path:

点击 ”Submit“ 提交应用

5、测试

此时在 MySQL 中插入如下数据:

mysql> insert into user_info values(null, 'zhangsan', 1, 'zhs','beijing');

mysql> insert into user_info values(null, 'lisi', 1, 'ls','shanghai');

mysql> insert into user_info values(null, 'wangwu', 1, 'ww','wangwu');

在 Flink 的 webUI 中 Task Managers 中点击项目,在 Stdout 中有输出日志:

{"database":"test","data":{"sex":1,"nickname":"zhs","id":1,"addr":"beijing","username":"zhangsan"},"type":"insert","table":"user_info"}{"database":"test","data":{"sex":1,"nickname":"ls","id":2,"addr":"shanghai","username":"lisi"},"type":"insert","table":"user_info"}{"database":"test","data":{"sex":1,"nickname":"ww","id":3,"addr":"wangwu","username":"wangwu"},"type":"insert","table":"user_info"}

来源地址:https://blog.csdn.net/zhy0414/article/details/129692546

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

FlinkCDC 实时监控 MySQL

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

shell监控脚本实例—监控mysql主从复制

本节内容:监控mysql主从复制的shell脚本。 说明:监控脚本在 rhel5 下测试正常,其它版本的linux 系统请自行测试,需要的一些准备工作可以查看这篇文章 代码:#监控mysql 主从复制cat chk_mysql_rep.sh
2022-06-04

如何用Python实时监控

这期内容当中小编将会给大家带来有关如何用Python实时监控,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。最近突然有个奇妙的想法,就是当我对着电脑屏幕的时候,电脑会先识别屏幕上的人脸是否是本人,如果识别是
2023-06-01
2024-04-02

promethus监控mysql

下载页面https://github.com/prometheus/mysqld_exporter/releases下载最新版本https://github.com/prometheus/mysqld_exporter/releases/d
2023-01-31

MySQL监控参数

1.查看xx库所有表信息参数详解Name(表名称)Engine(存储引擎)Version(版本)Row_format(行格式。对于MyISAM引擎,这可能是Dynamic,Fixed或Compressed。动态行的行长度可变例如Varchar或Blob类型字段
MySQL监控参数
2019-09-29

pmm-server监控mysql

https://blog.csdn.net/RunzIyy/article/details/104635680?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLea
pmm-server监控mysql
2017-09-22

PHP 安全漏洞的实时监控

实时监控 php 安全漏洞的方法:安装 sentry 库并配置 sentry dsn捕获错误和异常,并记录安全漏洞标签创建 sentry 警报,根据安全漏洞标记触发识别和记录安全漏洞,以及时采取保护措施PHP 安全漏洞的实时监控引言PH
PHP 安全漏洞的实时监控
2024-05-01

人工智能用Python实时监控

编程学习网:最近突然有个奇妙的想法,就是当我对着电脑屏幕的时候,电脑会先识别屏幕上的人脸是否是本人,如果识别是本人的话需要回答电脑说的暗语,答对了才会解锁并且有三次机会。
人工智能用Python实时监控
2024-04-23

编程热搜

目录