用SPARK解析许多CSV文件时添加行号



我目前正在放入一个大型数据框架中,有大量的小型CSV文件。

的线
df = spark.read.format("csv").load("file*.csv")

由于如何解析数据集是构造的,所以我需要 line 数字df中每个行的相应源csv-file 中。是否有一些简单的方法来实现这一目标(最好是通过在input_file_name()zipwithindex()上进行分组的组合而不诉诸于重建它们(?

例如

# file1.csv
col1, col2
A, B
C, D

# file2.csv
col1, col2
E, F
G, H

我需要一个相当于

的结果数据框
row, col1, col2
1, A, B
2, C, D
1, E, F
2, G, H

如果您需要数据框中的row_number任意顺序,则可以使用以下替代方案。

如果使用Spark 2.x

类似这样的东西

val df = spark.read.format("csv").load("file*.csv").withColumn("rowId", monotonically_increasing_id())

另一种选择是使用row_number。但是,如果您在DataFrame中有分区

,那可行

之类的东西
val df = spark.read.format("csv").load("file*.csv").withColumn("rowId", row_number().over(Window.partitionBy("col1")

这将确保每个分区填充行号。

但是,如果您需要确切的订购,恐怕没有"闪电般的"方法。原因是一旦您将数据读为数据框,它就会将数据持续存在的顺序。

您可以在单个计算机中使用Java程序合并CSV文件,并在程序中添加行号。

最新更新