我使用的是Flink 1.13.0,我有以下简单的代码片段
import org.apache.flink.table.api.bridge.scala.table2RowDataSet
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
object HelloFlinkBatchTable {
def main(args: Array[String]): Unit = {
val settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build()
val tenv = TableEnvironment.create(settings)
val words = tenv.fromValues("hello", "world", "hadoop", "spark", "world").as("word")
words.collect().foreach(println)
words.printSchema()
tenv.createTemporaryView("words", words)
//collect works on TableResult
val result = tenv.executeSql("select word from words")
result.collect()
//collect doesn't work on the Table
//ERROR:Table cannot be converted into a DataSet. It is not part of a batch table environment.
words.collect()
}
}
我想问为什么TableResult.collect
工作而Table.collect
不工作(错误是:Table cannot be converted into a DataSet. It is not part of a batch table environment.
(。我认为我已经在代码中正确地指定了批处理环境。
隐式转换table2RowDataSet
实际上已被弃用,但通常很难弃用隐式转换。
数据集API即将结束使用,并将在中期内完全集成到TableEnvironment
和StreamExecutionEnvironment
中。
TableResult.collect
是官方支持的、稳定的检索结果的方式。Flink 1.14中将更新其他隐含信息,请参阅Flink-22590。