Flink使用lambda表达式

Java8出来之后,lambda表达式由于简单易读,在流式计算中的使用开始变得普遍。

同样,Flink也支持lambda表达式,例如我们改写一下wordcount样例

1
2
3
4
5
6
7
8
9
10
11
12
13
DataSource<String> lines = env.fromElements(
"Apache Flink is a community-driven open source framework for distributed big data analytics,",
"like Hadoop and Spark. The core of Apache Flink is a distributed streaming dataflow engine written",
...
);
lines.flatMap(new FlatMapFunction<String, Object>() {
@Override
public void flatMap(String line, Collector< Object> out) throws Exception {
for (String word : line.split("\\W+")) {
out.collect(new Tuple2<>(word, 1));
}
}
}).groupBy(0).sum(1).print();

这段代码很简单,先把每一行按空格拆分成若干单词,并将每个单词和数字1组成一个Tuple,然后把所有Tuple按照单词聚合,计算出每个单词的出现次数

尝试用lambda表达式来替换FlatMapFunction,代码如下

1
2
3
4
5
lines.flatMap((line, out) -> {
for (String word : line.split("\\W+")) {
out.collect(new Tuple2<>(word, 1));
}
}).groupBy(0).sum(1).print();

但当运行这段代码时,会抛出如下异常:

1
2
3
4
5
6
7
8
9
10
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. 
It seems that your compiler has not stored them into the .class file.
Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely.
See the documentation for more information about how to compile jobs containing lambda expressions.
at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:1653)
at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:1639)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:573)
at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:188)
at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:266)
at TestFlink.main(TestFlink.java:21)

这是因为Flink在用户自定义的函数中会使用泛型来创建serializer,当我们使用匿名函数时,类型信息会被保留。但Lambda表达式并不是匿名函数,所以javac编译的时候并不会把泛型保存到class文件里。

解决办法有两种:

第一种办法在异常中已经提示,使用Eclipse JDT编译器会保留对lambda表达式来说必要的类型信息。在Maven中使用Eclipse JDT编译器,只需要在把下面的插件加入到pom.xml中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<compilerId>jdt</compilerId>
</configuration>
<dependencies>
<dependency>
<groupId>org.eclipse.tycho</groupId>
<artifactId>tycho-compiler-jdt</artifactId>
<version>0.21.0</version>
</dependency>
</dependencies>
</plugin>
</plugins>

另一种办法是,使用Flink提供的returns方法来指定flatMap的返回类型,

1
2
3
4
5
6
text.flatMap((line, out) -> {
for (String word : line.split("\\W+")) {
out.collect(new Tuple2<>(word, 1));
}
}).returns((TypeInformation) TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class)).groupBy(0).sum(1)
.print();

returns函数接收TypeInformation类型的参数,这里我们创建TupleTypeInfo来指定Tuple的参数类型。