Java8出来之后,lambda表达式由于简单易读,在流式计算中的使用开始变得普遍。
同样,Flink也支持lambda表达式,例如我们改写一下wordcount样例
1 | DataSource<String> lines = env.fromElements( |
这段代码很简单,先把每一行按空格拆分成若干单词,并将每个单词和数字1组成一个Tuple,然后把所有Tuple按照单词聚合,计算出每个单词的出现次数
尝试用lambda表达式来替换FlatMapFunction,代码如下
1 | lines.flatMap((line, out) -> { |
但当运行这段代码时,会抛出如下异常:
1 | Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. |
这是因为Flink在用户自定义的函数中会使用泛型来创建serializer,当我们使用匿名函数时,类型信息会被保留。但Lambda表达式并不是匿名函数,所以javac编译的时候并不会把泛型保存到class文件里。
解决办法有两种:
第一种办法在异常中已经提示,使用Eclipse JDT编译器会保留对lambda表达式来说必要的类型信息。在Maven中使用Eclipse JDT编译器,只需要在把下面的插件加入到pom.xml中
1 | <plugins> |
另一种办法是,使用Flink提供的returns方法来指定flatMap的返回类型,
1 | text.flatMap((line, out) -> { |
returns函数接收TypeInformation类型的参数,这里我们创建TupleTypeInfo来指定Tuple的参数类型。