Flink TableAPI中出现异常



我正在尝试运行下面的简单Flink作业,以使用TableAPI计数单词。使用DataStream API读取数据流,并使用StreamTableEnvironment API创建表环境。我正处于异常以下。有人能帮我代码出了什么问题吗?我使用的是Flink 1.8版本。

异常:

**Exception in thread "main" org.apache.flink.table.api.TableException: Only the first field can reference an atomic type.**
at org.apache.flink.table.api.TableEnvironment$$anonfun$5.apply(TableEnvironment.scala:1117)
at org.apache.flink.table.api.TableEnvironment$$anonfun$5.apply(TableEnvironment.scala:1112)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:1112)
at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:546)
at org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:91)
at Udemy_Course.TableAPIExample.CountWordExample.main(CountWordExample.java:30)

代码

public class CountWordExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment=StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment streamTableEnvironment=StreamTableEnvironment.create(environment);

DataStream<WC> streamOfWords =
environment.fromElements(
new WC("Hello",1L),
new WC("Howdy",1L),
new WC("Hello",1L),
new WC("Hello",1L));
Table t1 = streamTableEnvironment.fromDataStream(streamOfWords, "word, count");
Table result = streamTableEnvironment.sqlQuery("select word, count(word) as wordcount 
from " + t1 + " group by word");

streamTableEnvironment.toRetractStream(result, CountWordExample.class ).print();
environment.execute();
}
public static class WC {
private String word;
private Long count;
public WC() {}
public WC(String word,Long count) {
this.word=word;
this.count=count;
}
}
}

运行此程序所需的只是更改WC中的私有字和计数字段,使其成为公共字段,并在toRetractStream中使用Row.class而不是CountWordExample.class。

这些字段必须是公共的,或者具有可公开访问的getter和setter,SQL引擎才能使用它们。

public class CountWordExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment=StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment streamTableEnvironment=StreamTableEnvironment.create(environment);

DataStream<WC> streamOfWords =
environment.fromElements(
new WC("Hello",1L),
new WC("Howdy",1L),
new WC("Hello",1L),
new WC("Hello",1L));
Table t1 = streamTableEnvironment.fromDataStream(streamOfWords, "word, count");
Table result = streamTableEnvironment.sqlQuery("select word, count(word) as wordcount from " + t1 + " group by word");
streamTableEnvironment.toRetractStream(result, Row.class).print();
environment.execute();
}
public static class WC {
public String word;
public Long count;
public WC() {}
public WC(String word,Long count) {
this.word=word;
this.count=count;
}
}
}

相关内容

  • 没有找到相关文章

最新更新