Spark-读取单个CSV文件,处理结果并将结果写入单个CSV文件中,同时保持原始行顺序



我想从Spark中读取一个CSV文件(小于50MB(,并执行一些join&过滤器操作。CSV文件中的行按某些条件排序(在本例中为Score(。我想把结果保存在一个CSV文件中,原始行的顺序保持不变。

输入CSV文件:

Id, Score
5, 100
3, 99
6, 98
7, 95

经过一段时间的加入&过滤操作:

val data = spark.read.option("header", "true").csv("s3://some-bucket/some-dir/123.csv")
val results = data
.dropDuplicates($"some_col")
.filter(x => ...)
.join(anotherDataset, Seq("some_col"), "left_anti")
results.repartition(1).write.option("header", "true").csv("...")

预期输出:

Id, Score
5, 100
6, 98

(ID 3和7被过滤掉(

由于Spark可能会将数据加载到多个分区中,如何保持原始顺序?

在执行任何改变记录顺序的操作(如分组方式、联接、不同等(之前,您需要使用monetically_increasing_id((附加一列。此函数可以帮助您在分区内重新创建记录的顺序。

"生成的ID保证是单调递增的和唯一的,但不是连续的。当前的实现将分区ID放在较高的31位中,而较低的33位表示每个分区内的记录号"0";

val data = spark.read.option("header", "true").csv("s3://some-bucket/some-dir/123.csv")
val results = data
.withColumn("rowId",monotonically_increasing_id())
.dropDuplicates($"some_col"). // this might need to be replaced with a window function.
.filter(x => ...)
.join(anotherDataset, Seq("some_col"), "left_anti")
results.repartition(1)
.orderBy("rowId")
.write.option("header", "true").csv("...")

请注意,由于某些原因,sparksql不包括用于获取spark分区id或spark分区行号的简单内置函数,但幸运的是,单调increasing_id做得足够好。

最新更新