我的编程空间,编程开发者的网络收藏夹
学习永远不晚

「Flink」使用Java lambda表达式实现Flink WordCount

短信预约 信息系统项目管理师 报名、考试、查分时间动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

「Flink」使用Java lambda表达式实现Flink WordCount

「Flink」使用Java lambda表达式实现Flink WordCount

本篇我们将使用Java语言来实现Flink的单词统计。

代码开发

环境准备

导入Flink 1.9 pom依赖

<dependencies>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-javaartifactId>
            <version>1.9.0version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-streaming-java_2.11artifactId>
            <version>1.9.0version>
        dependency>
        <dependency>
            <groupId>org.apache.commonsgroupId>
            <artifactId>commons-lang3artifactId>
            <version>3.7version>
        dependency>
    dependencies>

构建Flink流处理环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

自定义source

每秒生成一行文本

DataStreamSource wordLineDS = env.addSource(new RichSourceFunction() {
            private boolean isCanal = false;
            private String[] words = {
                    "important oracle jdk license update",
                    "the oracle jdk license has changed for releases starting april 16 2019",
                    "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
                    "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
                    "downloading and using this product an faq is available here ",
                    "commercial license and support is available with a low cost java se subscription",
                    "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
            };

            @Override
            public void run(SourceContext ctx) throws Exception {
                // 每秒发送一行文本
                while (!isCanal) {
                    int randomIndex = RandomUtils.nextInt(0, words.length);
                    ctx.collect(words[randomIndex]);
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanal = true;
            }
        });

单词计算

// 3. 单词统计
        // 3.1 将文本行切分成一个个的单词
        SingleOutputStreamOperator wordsDS = wordLineDS.flatMap((String line, Collector ctx) -> {
            // 切分单词
            Arrays.stream(line.split(" ")).forEach(word -> {
                ctx.collect(word);
            });
        }).returns(Types.STRING);

        //3.2 将单词转换为一个个的元组
        SingleOutputStreamOperator> tupleDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        // 3.3 按照单词进行分组
        KeyedStream, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

        // 3.4 对每组单词数量进行累加
        SingleOutputStreamOperator> resultDS = keyedDS
                .timeWindow(Time.seconds(3))
                .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

        resultDS.print();

参考代码

public class WordCount {
    public static void main(String[] args) throws Exception {
        // 1. 构建Flink流式初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 自定义source - 每秒发送一行文本
        DataStreamSource wordLineDS = env.addSource(new RichSourceFunction() {
            private boolean isCanal = false;
            private String[] words = {
                    "important oracle jdk license update",
                    "the oracle jdk license has changed for releases starting april 16 2019",
                    "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
                    "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
                    "downloading and using this product an faq is available here ",
                    "commercial license and support is available with a low cost java se subscription",
                    "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
            };

            @Override
            public void run(SourceContext ctx) throws Exception {
                // 每秒发送一行文本
                while (!isCanal) {
                    int randomIndex = RandomUtils.nextInt(0, words.length);
                    ctx.collect(words[randomIndex]);
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanal = true;
            }
        });

        // 3. 单词统计
        // 3.1 将文本行切分成一个个的单词
        SingleOutputStreamOperator wordsDS = wordLineDS.flatMap((String line, Collector ctx) -> {
            // 切分单词
            Arrays.stream(line.split(" ")).forEach(word -> {
                ctx.collect(word);
            });
        }).returns(Types.STRING);

        //3.2 将单词转换为一个个的元组
        SingleOutputStreamOperator> tupleDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        // 3.3 按照单词进行分组
        KeyedStream, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

        // 3.4 对每组单词数量进行累加
        SingleOutputStreamOperator> resultDS = keyedDS
                .timeWindow(Time.seconds(3))
                .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

        resultDS.print();

        env.execute("app");
    }
}

Flink对Java Lambda表达式支持情况

Flink支持Java API所有操作符使用Lambda表达式。但是,但Lambda表达式使用Java泛型时,就需要声明类型信息。

我们来看下上述的这段代码:

SingleOutputStreamOperator wordsDS = wordLineDS.flatMap((String line, Collector ctx) -> {
            // 切分单词
            Arrays.stream(line.split(" ")).forEach(word -> {
                ctx.collect(word);
            });
        }).returns(Types.STRING);

之所以这里将所有的类型信息,因为Flink无法正确自动推断出来Collector中带的泛型。我们来看一下FlatMapFuntion的源代码

@Public
@FunctionalInterface
public interface FlatMapFunction extends Function, Serializable {


void flatMap(T value, Collector out) throws Exception;
}

我们发现 flatMap的第二个参数是Collector,是一个带参数的泛型。Java编译器编译该代码时会进行参数类型擦除,所以Java编译器会变成成:

void flatMap(T value, Collector out)

这种情况,Flink将无法自动推断类型信息。如果我们没有显示地提供类型信息,将会出现以下错误:

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of "Collector" are missing.
    In many cases lambda methods don"t provide enough information for automatic type extraction when Java generics are involved.
    An easy workaround is to use an (anonymous) class instead that implements the "org.apache.flink.api.common.functions.FlatMapFunction" interface.
    Otherwise the type has to be specified explicitly using type information.

这种情况下,必须要显示指定类型信息,否则输出将返回值视为Object类型,这将导致Flink无法正确序列化。

所以,我们需要显示地指定Lambda表达式的参数类型信息,并通过returns方法显示指定输出的类型信息

我们再看一段代码:

SingleOutputStreamOperator> tupleDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

为什么map后面也需要指定类型呢?

因为此处map返回的是Tuple2类型,Tuple2是带有泛型参数,在编译的时候同样会被查出泛型参数信息,导致Flink无法正确推断。

更多关于对Java Lambda表达式的支持请参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

「Flink」使用Java lambda表达式实现Flink WordCount

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

「Flink」使用Java lambda表达式实现Flink WordCount

本篇我们将使用Java语言来实现Flink的单词统计。代码开发环境准备导入Flink 1.9 pom依赖 org.apache.flink flink-java 1.9.0
「Flink」使用Java lambda表达式实现Flink WordCount
2016-09-15

java lambda表达式怎么使用

Java lambda表达式是Java 8引入的一种新特性,它可以简化代码的编写,尤其是在处理函数式接口时非常方便。下面是使用lambda表达式的一些常用方法:1. 使用匿名内部类的方式创建函数式接口的实例:```MyInterface m
2023-09-23

Java Lambda表达式怎么使用

这篇文章主要介绍“Java Lambda表达式怎么使用”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Java Lambda表达式怎么使用”文章能帮助大家解决问题。一、背景Lambda表达式是Java
2023-06-29

使用Java 8中的Lambda表达式实现工厂模式

前言工厂模式是面向对象设计模式中大家最为熟知的设计模式之一。传统的实现方式大家都在熟悉不过了,今天将向大家介绍使用Java8 Lambda 表达式更加优雅的实现工厂模式。封面工厂模式在java中最常用的设计模式之一,它提供了一种很好的实例化
2023-05-31

Java的Lambda表达式使用实例分析

这篇文章主要讲解了“Java的Lambda表达式使用实例分析”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java的Lambda表达式使用实例分析”吧!Lambada 简介lambda 表达
2023-07-05

Java的Lambda表达式如何使用

这篇文章主要介绍“Java的Lambda表达式如何使用”,在日常操作中,相信很多人在Java的Lambda表达式如何使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Java的Lambda表达式如何使用”的疑
2023-06-30

Java中Lambda表达式基础及使用

目录一、举例说明1、无参无返回1.1 定义一个接口1.2接口实现类1.3 测试类2、有参无返回代码示例3、有参有返回二、简单事项1、省略模式2、注意事项三、Lambda表达式和匿名内部类的区别1、所需类型不同:2、使用限制不同:3、实现原理不同:标准格式:三要
2019-12-07

Java中Lambda表达式使用及详解

Java中Lambda表达式使用及详解 前言 一、Lambda表达式的简介 Lambda表达式(闭包):java8的新特性,lambda运行将函数作为一个方法的参数,也就是函数作为参数传递到方法中。使用lambda表达式可以让代码更加
2023-08-16

lambda表达式如何在java中使用

这篇文章给大家介绍lambda表达式如何在java中使用,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。Lamda表达式λ 希腊字母表中排序第十一位字母,英语名称为Lambda避免匿名内部类定义过多其实质属于函数式 编程
2023-06-14

如何使用.NET Lambda表达式实现委托

本篇内容介绍了“如何使用.NET Lambda表达式实现委托”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!.NET Lambda表达式的写法
2023-06-18

Java中Lambda表达式使用详细解读

这篇文章主要介绍了Java中Lambda表达式使用及详解,lambda运行将函数作为一个方法的参数,也就是函数作为参数传递到方法中,使用lambda表达式可以让代码更加简洁,需要的朋友可以参考下
2023-05-18

Java如何使用 Lambda 表达式实现超强的排序功能

这篇文章主要介绍Java如何使用 Lambda 表达式实现超强的排序功能,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!首先,我们定义一个基础类,后面我们将根据这个基础类演示如何在内存中排序。@Data@NoArgsC
2023-06-25

Java学习之Lambda表达式的使用详解

Lambda表达式是JavaSE8中一个重要的新特性,允许通过表达式来代替功能接口。本文将通过一些简单的示例和大家讲讲Lamda表达式的使用,感兴趣的可以了解一下
2022-12-26

如何在Java项目中使用lambda表达式

这期内容当中小编将会给大家带来有关如何在Java项目中使用lambda表达式,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。Java8引入了lambda表达式。lambda表达式并不是新功能,只是为了方便代
2023-05-31

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录