我的编程空间,编程开发者的网络收藏夹
学习永远不晚

使用flink的sql-client.sh,测试mysql-->kafka-->kafka-->mysql实时流

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

使用flink的sql-client.sh,测试mysql-->kafka-->kafka-->mysql实时流

目录

1. 环境介绍

2. mysql建表

3. flinksql建表

3.1 进入flinksql客户端 

​3.2 配置输出格式

​3.3 flink建表

3.4 任务流配置

4. 测试

4.1 插入测试数据

4.2 查看结果表数据​

4.3 新增测试数据

4.4 再次查看结果表数据


1. 环境介绍

服务版本
zookeeper3.8.0
kafka3.3.1
flink1.13.5
mysql5.7.34
jdk1.8
scala2.12

连接器作用
flink-sql-connector-upsert-kafka_2.11-1.13.6.jar连接kafka,支持主键更新
flink-connector-mysql-cdc-2.0.2.jar读mysql
flink-connector-jdbc_2.11-1.13.6.jar写mysql
mysql-connector-java-5.1.37.jar连接mysql

2. mysql中建表

CREATE TABLE class="lazy" data-src_mysql_order( order_id BIGINT, store_id BIGINT, sales_amt double, PRIMARY KEY (`order_id`));CREATE TABLE class="lazy" data-src_mysql_order_detail( order_id BIGINT, store_id BIGINT, goods_id BIGINT, sales_amt double, PRIMARY KEY (order_id,store_id,goods_id));CREATE TABLE dim_store( store_id BIGINT, store_name varchar(100), PRIMARY KEY (`store_id`) );CREATE TABLE dim_goods( goods_id BIGINT, goods_name varchar(100), PRIMARY KEY (`goods_id`));CREATE TABLE dwa_mysql_order_analysis (store_id BIGINT,store_name varchar(100),sales_goods_distinct_nums bigint,sales_amt double,order_nums bigint,PRIMARY KEY (store_id,store_name));

3. flinksql建表

3.1 进入flinksql客户端 

sql-client.sh embedded

​3.2 配置输出格式

SET sql-client.execution.result-mode=tableau;


3.3 flink建表

--mysql中的 订单主表CREATE TABLE class="lazy" data-src_mysql_order( order_id BIGINT, store_id BIGINT, sales_amt double, PRIMARY KEY (`order_id`) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'hadoop002', 'port' = '3306', 'username' = 'root', 'password' = 'root', 'database-name' = 'test', 'table-name' = 'class="lazy" data-src_mysql_order', 'scan.incremental.snapshot.enabled' = 'false');--mysql中的 订单明细表CREATE TABLE class="lazy" data-src_mysql_order_detail( order_id BIGINT, store_id BIGINT, goods_id BIGINT, sales_amt double, PRIMARY KEY (order_id,store_id,goods_id) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'hadoop002', 'port' = '3306', 'username' = 'root', 'password' = 'root', 'database-name' = 'test', 'table-name' = 'class="lazy" data-src_mysql_order_detail', 'scan.incremental.snapshot.enabled' = 'false');--mysql中的 商店维表CREATE TABLE dim_store( store_id BIGINT, store_name varchar(100), PRIMARY KEY (`store_id`) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'hadoop002', 'port' = '3306', 'username' = 'root', 'password' = 'root', 'database-name' = 'test', 'table-name' = 'dim_store', 'scan.incremental.snapshot.enabled' = 'false');--mysql中的 商品维表CREATE TABLE dim_goods( goods_id BIGINT, goods_name varchar(100), PRIMARY KEY (`goods_id`) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'hadoop002', 'port' = '3306', 'username' = 'root', 'password' = 'root', 'database-name' = 'test', 'table-name' = 'dim_goods', 'scan.incremental.snapshot.enabled' = 'false');--kafka中的 ods层 订单表CREATE TABLE ods_kafka_order ( order_id BIGINT, store_id BIGINT, sales_amt double, PRIMARY KEY (`order_id`) NOT ENFORCED) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'ods_kafka_order', 'properties.bootstrap.servers' = 'hadoop001:9092', 'properties.group.id' = 'ods_group1',  'key.format' = 'json', 'value.format' = 'json');----kafka中的 ods层 订单明细表CREATE TABLE ods_kafka_order_detail ( order_id BIGINT, store_id BIGINT, goods_id BIGINT, sales_amt double, PRIMARY KEY (order_id,store_id,goods_id) NOT ENFORCED) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'ods_kafka_order_detail', 'properties.bootstrap.servers' = 'hadoop001:9092', 'properties.group.id' = 'ods_group1',  'key.format' = 'json', 'value.format' = 'json');--kafka中的 dwd层 订单表CREATE TABLE dwd_kafka_order ( order_id BIGINT, store_id BIGINT, sales_amt double, PRIMARY KEY (`order_id`) NOT ENFORCED) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'dwd_kafka_order', 'properties.bootstrap.servers' = 'hadoop001:9092', 'properties.group.id' = 'dwd_group1',  'key.format' = 'json', 'value.format' = 'json');--kafka中的 dwd层 订单明细表CREATE TABLE dwd_kafka_order_detail ( order_id BIGINT, store_id BIGINT, goods_id BIGINT, sales_amt double, PRIMARY KEY (order_id,store_id,goods_id) NOT ENFORCED) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'dwd_kafka_order_detail', 'properties.bootstrap.servers' = 'hadoop001:9092', 'properties.group.id' = 'dwd_group1',  'key.format' = 'json', 'value.format' = 'json');--mysql中的dwa 订单指标统计CREATE TABLE dwa_mysql_order_analysis (store_id BIGINT,store_name varchar(100),sales_goods_distinct_nums bigint,sales_amt double,order_nums bigint,PRIMARY KEY (store_id,store_name) NOT ENFORCED) WITH (   'connector' = 'jdbc',   'url' = 'jdbc:mysql://hadoop002:3306/test',   'table-name' = 'dwa_mysql_order_analysis',    'driver' = 'com.mysql.cj.jdbc.Driver',    'username' = 'root',    'password' = 'root','sink.buffer-flush.max-rows' = '10');

3.4 任务流配置

--任务流配置insert into ods_kafka_order select * from class="lazy" data-src_mysql_order;insert into ods_kafka_order_detail select * from class="lazy" data-src_mysql_order_detail;insert into dwd_kafka_order select * from ods_kafka_order;insert into dwd_kafka_order_detail select * from ods_kafka_order_detail;insert into dwa_mysql_order_analysisselect orde.store_id as store_id,store.store_name as store_name,count(distinct order_detail.goods_id) as sales_goods_distinct_nums,sum(order_detail.sales_amt) as sales_amt,count(distinct orde.order_id) as order_numsfrom dwd_kafka_order as ordejoin dwd_kafka_order_detailas order_detailon  orde.order_id = order_detail.order_idjoin dim_store as store on  orde.store_id = store.store_id group by orde.store_id,store.store_name ;

查看flink管理界面,可以看到有5个正在运行的任务,实时流就配置好了

4. 测试

4.1 插入测试数据

insert into class="lazy" data-src_mysql_order values (20221210001,10000,50),(20221210002,10000,20),(20221210003,10001,10);insert into class="lazy" data-src_mysql_order_detail values (20221210001,10000,100000,30),(20221210001,10000,100001,20),(20221210002,10000,100001,20),(20221210003,10001,100000,10);insert into dim_store values (10000, '宇唐总店'),(10001, '宇唐一店'),(10002, '宇唐二店'),(10003, '宇唐三店');insert into dim_goods values (100000, '天狮达特浓缩枣浆'),(100001, '蜜炼柚子茶');

4.2 查看结果表数据

4.3 新增测试数据

insert into class="lazy" data-src_mysql_order values  (20221210004,10002,50), (20221210005,10003,30);insert into class="lazy" data-src_mysql_order_detail values (20221210004,10002,100000,30),(20221210004,10002,100001,20),(20221210005,10003,100000,10),(20221210005,10003,100001,20);

 4.4 再次查看结果表数据

来源地址:https://blog.csdn.net/TangYuG/article/details/128268085

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

使用flink的sql-client.sh,测试mysql-->kafka-->kafka-->mysql实时流

下载Word文档到电脑,方便收藏和打印~

下载Word文档

编程热搜

目录