「Flink」使用Java lambda表达式实现Flink WordCount
本篇我们将使用Java语言来实现Flink的单词统计。
代码开发
环境准备
导入Flink 1.9 pom依赖
<dependencies>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-javaartifactId>
<version>1.9.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_2.11artifactId>
<version>1.9.0version>
dependency>
<dependency>
<groupId>org.apache.commonsgroupId>
<artifactId>commons-lang3artifactId>
<version>3.7version>
dependency>
dependencies>
构建Flink流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
自定义source
每秒生成一行文本
DataStreamSource wordLineDS = env.addSource(new RichSourceFunction() {
private boolean isCanal = false;
private String[] words = {
"important oracle jdk license update",
"the oracle jdk license has changed for releases starting april 16 2019",
"the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
"personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
"downloading and using this product an faq is available here ",
"commercial license and support is available with a low cost java se subscription",
"oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
};
@Override
public void run(SourceContext ctx) throws Exception {
// 每秒发送一行文本
while (!isCanal) {
int randomIndex = RandomUtils.nextInt(0, words.length);
ctx.collect(words[randomIndex]);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isCanal = true;
}
});
单词计算
// 3. 单词统计
// 3.1 将文本行切分成一个个的单词
SingleOutputStreamOperator wordsDS = wordLineDS.flatMap((String line, Collector ctx) -> {
// 切分单词
Arrays.stream(line.split(" ")).forEach(word -> {
ctx.collect(word);
});
}).returns(Types.STRING);
//3.2 将单词转换为一个个的元组
SingleOutputStreamOperator> tupleDS = wordsDS
.map(word -> Tuple2.of(word, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT));
// 3.3 按照单词进行分组
KeyedStream, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);
// 3.4 对每组单词数量进行累加
SingleOutputStreamOperator> resultDS = keyedDS
.timeWindow(Time.seconds(3))
.reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));
resultDS.print();
参考代码
public class WordCount {
public static void main(String[] args) throws Exception {
// 1. 构建Flink流式初始化环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 自定义source - 每秒发送一行文本
DataStreamSource wordLineDS = env.addSource(new RichSourceFunction() {
private boolean isCanal = false;
private String[] words = {
"important oracle jdk license update",
"the oracle jdk license has changed for releases starting april 16 2019",
"the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
"personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
"downloading and using this product an faq is available here ",
"commercial license and support is available with a low cost java se subscription",
"oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
};
@Override
public void run(SourceContext ctx) throws Exception {
// 每秒发送一行文本
while (!isCanal) {
int randomIndex = RandomUtils.nextInt(0, words.length);
ctx.collect(words[randomIndex]);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isCanal = true;
}
});
// 3. 单词统计
// 3.1 将文本行切分成一个个的单词
SingleOutputStreamOperator wordsDS = wordLineDS.flatMap((String line, Collector ctx) -> {
// 切分单词
Arrays.stream(line.split(" ")).forEach(word -> {
ctx.collect(word);
});
}).returns(Types.STRING);
//3.2 将单词转换为一个个的元组
SingleOutputStreamOperator> tupleDS = wordsDS
.map(word -> Tuple2.of(word, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT));
// 3.3 按照单词进行分组
KeyedStream, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);
// 3.4 对每组单词数量进行累加
SingleOutputStreamOperator> resultDS = keyedDS
.timeWindow(Time.seconds(3))
.reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));
resultDS.print();
env.execute("app");
}
}
Flink对Java Lambda表达式支持情况
Flink支持Java API所有操作符使用Lambda表达式。但是,但Lambda表达式使用Java泛型时,就需要声明类型信息。
我们来看下上述的这段代码:
SingleOutputStreamOperator wordsDS = wordLineDS.flatMap((String line, Collector ctx) -> {
// 切分单词
Arrays.stream(line.split(" ")).forEach(word -> {
ctx.collect(word);
});
}).returns(Types.STRING);
之所以这里将所有的类型信息,因为Flink无法正确自动推断出来Collector中带的泛型。我们来看一下FlatMapFuntion的源代码
@Public
@FunctionalInterface
public interface FlatMapFunctionextends Function, Serializable {
void flatMap(T value, Collectorout) throws Exception;
}
我们发现 flatMap的第二个参数是Collector
void flatMap(T value, Collector out)
这种情况,Flink将无法自动推断类型信息。如果我们没有显示地提供类型信息,将会出现以下错误:
org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of "Collector" are missing. In many cases lambda methods don"t provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the "org.apache.flink.api.common.functions.FlatMapFunction" interface. Otherwise the type has to be specified explicitly using type information.
这种情况下,必须要显示指定类型信息,否则输出将返回值视为Object类型,这将导致Flink无法正确序列化。
所以,我们需要显示地指定Lambda表达式的参数类型信息,并通过returns方法显示指定输出的类型信息
我们再看一段代码:
SingleOutputStreamOperator> tupleDS = wordsDS
.map(word -> Tuple2.of(word, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT));
为什么map后面也需要指定类型呢?
因为此处map返回的是Tuple2类型,Tuple2是带有泛型参数,在编译的时候同样会被查出泛型参数信息,导致Flink无法正确推断。
更多关于对Java Lambda表达式的支持请参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341