我在Java中使用Flink编写了一个简单的程序,该程序将文件或文本作为输入,然后使用flatMap函数打印所有单词。
这是我的代码:
final ParameterTool params = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
// show user defined parameters in the apache flink dashboard
DataStream<String> dataStream;
if(params.has("input"))
{
System.out.println("Executing Words example with file input");
dataStream = env.readTextFile(params.get("input"));
}else if (params.has("host") && params.has("port"))
{
System.out.println("Executing Words example with socket stream");
dataStream = env.socketTextStream(params.get("host"), Integer.parseInt(params.get("port")));
}
else {
System.exit(1);
return;
}
DataStream<String> wordDataStream = dataStream.flatMap(
(String sentence, Collector<String> out) -> {
for(String word: sentence.split(" "))
out.collect(word);
});
wordDataStream.print();
env.execute("Word Split");
但是当我使用以下命令运行它时:
bin/flink run -c Words FlinkExample-0.0.1-SNAPSHOT.jar --host localhost --port 9999
我收到以下错误:
程序失败,出现以下异常:
由于类型擦除,无法自动确定函数"main(Words.java:32("的返回类型。可以通过对转换调用的结果使用 returns(...( 方法,或者让函数实现"ResultTypeQueryable"接口来提供类型信息提示。
(第 32 行是指第二个数据流的声明(
我认为错误消息的简短描述非常好,但让我扩展一下。
为了执行一个程序,Flink 需要知道所处理值的类型,因为它需要序列化和反序列化它们。Flink 的类型系统基于TypeInformation
描述数据类型。当你指定一个函数时,Flink 会尝试推断出该函数的返回类型。对于示例的 FlatMapFunction,传递给Collector
的对象的类型。
遗憾的是,某些 Lambda 函数会丢失此信息,因为类型擦除,因此 Flink 无法自动推断类型。因此,必须显式声明返回类型。
您可以按如下方式提供类型信息:
DataStream<String> wordDataStream = dataStream.flatMap(
(String sentence, Collector<String> out) -> {
for(String word: sentence.split(" "))
out.collect(word); // collect objects of type String
}
).returns(Types.STRING); // declare return type of flatmap lambda function as String
我也面临着同样的问题。我尝试了下面的链接,它对我有用。
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/java8.html#compiler-limitations
虽然它是旧版本,但它对我有用。 我无法执行 maven 编译安装,但我能够运行 java main .class。 如果执行 maven 编译安装很重要,您应该在尝试之前三思而后行。
或者你可以创建一个函数类:
new FlatMapFunction<Input, Output>() {
@Override
public void flatMap(Input input, Collector<Output> collector) throws Exception {
...
}
}