我想用 Apache Flink 批处理两个文件,一个接一个。
举一个具体的例子:假设我想为每一行分配一个索引,这样第二个文件中的行就会跟在第一行之后。以下代码不会这样做,而是在两个文件中交错行:
val env = ExecutionEnvironment.getExecutionEnvironment
val text1 = env.readTextFile("/path/to/file1")
val text2 = env.readTextFile("/path/to/file2")
val union = text1.union(text2).flatMap { ... }
我想确保所有text1
首先通过flatMap
运算符发送,然后通过text2
发送所有信息。推荐的方法是什么?
提前感谢您的帮助。
>DataSet.union()
不跨输入提供任何顺序保证。来自同一输入分区的记录将保持顺序,但将与来自其他输入的记录合并。
但还有一个更根本的问题。Flink 是一个并行数据处理器。并行处理数据时,无法保留全局顺序。例如,当 Flink 并行读取文件时,它会尝试拆分这些文件并独立处理每个拆分文件。拆分是在没有任何特定顺序的情况下分发的。因此,单个文件的记录已经被打乱了。您需要将整个作业的并行度设置为 1,并实现自定义InputFormat
才能完成此操作。
你可以让它工作,但它不会并行,你需要调整很多东西。我不认为 Flink 是完成此类任务的最佳工具。 您是否考虑过使用简单的 unix 命令行工具来连接您的文件?