我正在尝试迭代数据源:
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val job = Job.getInstance
FileInputFormat.addInputPath(
job,
new Path("file.parquet.gz")
)
val hadoopInputFormat: HadoopInputFormat[Void, GenericRecord] =
new HadoopInputFormat(
new AvroParquetInputFormat[GenericRecord],
classOf[Void],
classOf[GenericRecord],
job
)
val data: DataSource[tuple.Tuple2[Void, GenericRecord]] = env.createInput(hadoopInputFormat)
当我做data.print时,我可以看到元组中的数据。
但当我这样做时:
data.map
{
res =>
println("!!!!!!!!!!!111")
println( res.f1)
}
没有打印任何内容。
我想迭代数据源并获取GenericRecord。请帮帮我。
要在不调用print
或collect
的情况下执行Flink批处理程序,需要调用env.execute()
。在没有上述API调用的情况下,只有此调用才会触发程序的执行。
您可以使用data.collect,然后使用:data.iterator((.next((迭代