我的编程空间,编程开发者的网络收藏夹
学习永远不晚

flink 使用sql实现kafka生产者和消费者

短信预约 信息系统项目管理师 报名、考试、查分时间动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码


	flink 使用sql实现kafka生产者和消费者


	flink 使用sql实现kafka生产者和消费者
[数据库教程]

maven依赖


        UTF-8
        1.8
        1.8
        1.11.2
        1.1.7
        1.7.25
    

    
        
            
            org.apache.flink
            flink-json
            ${flink.version}
        

        
            org.apache.flink
            flink-java
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-java_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-clients_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-connector-kafka_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-connector-wikiedits_2.12
            ${flink.version}
        

        
            org.apache.flink
            flink-table-planner_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-table-planner-blink_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-table-api-java-bridge_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-table-api-java
            ${flink.version}
        
        
            org.slf4j
            slf4j-api
            ${slf4j.version}
        
        
            ch.qos.logback
            logback-core
            ${logback.version}
        
        
            ch.qos.logback
            logback-classic
            ${logback.version}
        

        
            org.projectlombok
            lombok
            1.16.18
        
    

生产者

import com.g2.flink.models.CustomerStatusChangedEvent;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.table.api.Expressions.$;


//@Slf4j
public class KafkaTableStreamApiProducerTest {

    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                //.useOldPlanner() // flink
                .useBlinkPlanner() // blink
                .build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);

        Long baseTimestamp = 1600855709000L;
        DataStream eventDataSet = env.fromElements(
                new CustomerStatusChangedEvent(1010L, 2, baseTimestamp),
                new CustomerStatusChangedEvent(1011L, 2, baseTimestamp + 100),
                new CustomerStatusChangedEvent(1011L, 1, baseTimestamp - 100),
                new CustomerStatusChangedEvent(1010L, 3, baseTimestamp + 150)
        );

        String ddl = "CREATE TABLE CustomerStatusChangedEvent(
" +
                "customerId int,
" +
                "oldStatus int,
" +
                "newStatus int,
" +
                "eventTime bigint
" +
                ") WITH(
" +
                "‘connector.type‘=‘kafka‘,
" +
                "‘connector.version‘=‘universal‘,
" +

                "‘connector.properties.bootstrap.servers‘=‘192.168.1.85:9092,192.168.1.86:9092,192.168.12.87:9092‘,
" +
                "‘connector.topic‘=‘customer_statusChangedEvent‘,
" +
               
                "‘format.type‘=‘json‘
" +
                ")
"
                ;
        tableEnvironment.executeSql(ddl);


        while (true) {
            try {
                TimeUnit.SECONDS.sleep(3);
                int status = (int) (System.currentTimeMillis() % 3);
                String insert = "insert into CustomerStatusChangedEvent(customerId,oldStatus,newStatus,eventTime)" +
                        "values(1001,1," + status + "," + System.currentTimeMillis() + ")";
                tableEnvironment.executeSql(insert);
            } catch (Exception ex) {

            }
        }

    }
}

 

消费者

import com.g2.flink.models.CustomerStatusChangedEvent;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


//@Slf4j
public class KafkaTableStreamApiConsumerTest {

    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                //.useOldPlanner() // flink
                .useBlinkPlanner() // blink
                .build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);

        Long baseTimestamp = 1600855709000L;
        DataStream eventDataSet = env.fromElements(
                new CustomerStatusChangedEvent(1010L, 2, baseTimestamp),
                new CustomerStatusChangedEvent(1011L, 2, baseTimestamp + 100),
                new CustomerStatusChangedEvent(1011L, 1, baseTimestamp - 100),
                new CustomerStatusChangedEvent(1010L, 3, baseTimestamp + 150)
        );

        String ddl = "CREATE TABLE CustomerStatusChangedEvent(
" +
                "customerId int,
" +
                "oldStatus int,
" +
                "newStatus int,
" +
                "eventTime bigint
" +
                ") WITH(
" +
                "‘connector.type‘=‘kafka‘,
" +
                "‘connector.version‘=‘universal‘,
" +
                "‘connector.properties.group.id‘=‘g2_group‘,
" +
                "‘connector.properties.bootstrap.servers‘=‘192.168.1.85:9092,192.168.1.86:9092,192.168.1.87:9092‘,
" +
                "‘connector.topic‘=‘customer_statusChangedEvent‘,
" +
                "‘connector.startup-mode‘ = ‘latest-offset‘,
" +
                "‘format.type‘=‘json‘
" +
                ")
";
        tableEnvironment.executeSql(ddl);

        Table resultTb = tableEnvironment.sqlQuery("select customerId,newStatus as status " +
                " from CustomerStatusChangedEvent" +
                " where newStatus in(1,2)"
        );


    
        DataStream result = tableEnvironment.toAppendStream(resultTb, CustomerStatusLog.class);
        result.print();

        try {
            env.execute();
        } catch (Exception ex) {

        }
    }

    public static class CustomerStatusLog {
        private Long customerId;

        private Integer status;

        public Long getCustomerId() {
            return customerId;
        }

        public void setCustomerId(Long customerId) {
            this.customerId = customerId;
        }

        public Integer getStatus() {
            return status;
        }

        public void setStatus(Integer newStatus) {
            this.status = newStatus;
        }

        public CustomerStatusLog() {

        }

        @Override
        public String toString() {
            return "CustomerStatusLog{" +
                    "customerId=" + customerId +
                    ", status=" + status +
                    ‘}‘;
        }
    }
}

 

消费者打印

4> CustomerStatusLog{customerId=1001, status=2}
4> CustomerStatusLog{customerId=1001, status=1}
4> CustomerStatusLog{customerId=1001, status=2}
4> CustomerStatusLog{customerId=1001, status=2}

flink 使用sql实现kafka生产者和消费者

原文地址:https://www.cnblogs.com/zhshlimi/p/13725081.html

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

flink 使用sql实现kafka生产者和消费者

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

flink 使用sql实现kafka生产者和消费者

1.maven依赖 UTF-8 1.8 1.8 1.11.2 1.1.7 1.7.25 org.a

	flink 使用sql实现kafka生产者和消费者
2015-10-30

kafka-3python生产者和消费者

程序分为productor.py是发送消息端,consumer为消费消息端,启动的时候先启动product再启动consumer,毕竟只有发了消息,消费端才有消息可以消费,productor.py#!/usr/bin/env python2
2023-01-31

Kafka中生产者和消费者指的是什么

在Kafka中,生产者和消费者是指Kafka消息系统中参与消息传递的两种角色。生产者是指负责向Kafka集群中的主题(topic)发布消息的客户端应用程序。生产者将消息发送到指定的主题,并且可以选择指定消息的键(key),以及消息所属的分
Kafka中生产者和消费者指的是什么
2024-03-14

使用 sarama 监控 Kafka 生产者和消费者的性能数据

从现在开始,我们要努力学习啦!今天我给大家带来《使用 sarama 监控 Kafka 生产者和消费者的性能数据》,感兴趣的朋友请继续看下去吧!下文中的内容我们主要会涉及到等等知识点,如果在阅读本文过程中有遇到不清楚的地方,欢迎留言呀!我们一
使用 sarama 监控 Kafka 生产者和消费者的性能数据
2024-04-04

Golang rabbitMQ生产者和消费者怎么实现

今天小编给大家分享一下Golang rabbitMQ生产者和消费者怎么实现的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。消费
2023-06-30

java 中怎么实现生产者消费者

今天就跟大家聊聊有关java 中怎么实现生产者消费者,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。生产者消费者图存储空间已满,而生产者占用着它,消费者等着生产者让出空间从而去除产品,
2023-06-17

PHP实现生产者与消费者的案例

这篇文章主要介绍PHP实现生产者与消费者的案例,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!PHP中使用Kafka需要RdKafka扩展,而RdKafka依赖于librdkafka,所以这两个我们都需要安装,具体安装
2023-06-14

java中BlockingQueue如何实现生产者消费者

这篇文章主要为大家展示了“java中BlockingQueue如何实现生产者消费者”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“java中BlockingQueue如何实现生产者消费者”这篇文章
2023-05-30

golang生产者消费者模式怎么实现

在Go语言中,可以使用goroutine和channel来实现生产者消费者模式。首先,我们定义一个包含生产者和消费者的函数:func producer(ch chan<- int) {for i := 0; i < 10; i++ {c
2023-10-20

Queue 实现生产者消费者模型(实例讲解)

Python中,队列是线程间最常用的交换数据的形式。 Python Queue模块有三种队列及构造函数: 1、Python Queue模块的FIFO队列先进先出。 class Queue.Queue(maxsize) 2、LIFO类似于堆,
2022-06-04

Docker怎么启动RabbitMQ实现生产者与消费者

这篇文章主要讲解了“Docker怎么启动RabbitMQ实现生产者与消费者”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Docker怎么启动RabbitMQ实现生产者与消费者”吧!一、Doc
2023-07-05

java中生产者和消费者问题实例分析

这篇“java中生产者和消费者问题实例分析”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“java中生产者和消费者问题实例分析
2023-06-29

C++实现简单的生产者-消费者队列详解

这篇文章主要为大家详细介绍了如何利用C++实现一个简单的生产者-消费者队列,文中的示例代码讲解详细,感兴趣的小伙伴可以了解一下
2023-05-18

python多进程中的生产者和消费者模型怎么实现

这篇文章主要介绍了python多进程中的生产者和消费者模型怎么实现的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇python多进程中的生产者和消费者模型怎么实现文章都会有所收获,下面我们一起来看看吧。Pytho
2023-07-05

java中的wait()和notify()方法实现生产者消费者模式实例

这篇文章主要介绍“java中的wait()和notify()方法实现生产者消费者模式实例”,在日常操作中,相信很多人在java中的wait()和notify()方法实现生产者消费者模式实例问题上存在疑惑,小编查阅了各式资料,整理出简单好用的
2023-06-20

Go语言实现一个简单生产者消费者模型

目录一、生产者消费者模型二、Go语言实现1、无缓冲channel2、有缓冲channel三、实际应用简介:介绍生产者消费者模型,及go简单实现的demo。 一、生产者消费者模型 生产者消费者模型:某个模块(函数等〉负责产生数据,这些数据由另
2022-06-07

编程热搜

目录