Flink DataSource Iterate



我正在尝试迭代数据源:


val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val job = Job.getInstance
FileInputFormat.addInputPath(
job,
new Path("file.parquet.gz")
)
val hadoopInputFormat: HadoopInputFormat[Void, GenericRecord] =
new HadoopInputFormat(
new AvroParquetInputFormat[GenericRecord],
classOf[Void],
classOf[GenericRecord],
job
)
val data: DataSource[tuple.Tuple2[Void, GenericRecord]] = env.createInput(hadoopInputFormat)

当我做data.print时,我可以看到元组中的数据。

但当我这样做时:


data.map
{
res =>
println("!!!!!!!!!!!111")
println( res.f1)
}

没有打印任何内容。

我想迭代数据源并获取GenericRecord。请帮帮我。

要在不调用printcollect的情况下执行Flink批处理程序,需要调用env.execute()。在没有上述API调用的情况下,只有此调用才会触发程序的执行。

您可以使用data.collect,然后使用:data.iterator((.next((迭代

相关内容

  • 没有找到相关文章

最新更新