我有一个数据帧df
,其中的列id (long), name(String)
随机分布在数据帧的所有行中。目标是将数据帧拆分为唯一值 id
s,然后将sorted
数据帧(wrt name
字段)写出到磁盘。有两种方法可以执行此操作:
第一种方法是先对数据帧进行排序:
//Approach 1: Global sorting (Psuedo code)
val df_sorted = df.sort(col("name"))
val unique_ids = df.distinct().head().getLong(0)
// write out to disk
for specificID in unique_ids
df_unique = df.filter(col("id") = specificID))
// write to disk
df_unique.write(...)
第二种方法是在写入磁盘之前对数据帧进行排序
//Approach 2: Local sorting (Psuedo code)
val unique_ids = df.distinct().head().getLong(0)
// write out to disk
for all specificID in unique_ids
df_unique = df.filter(col("id") = specificID)).sort(col("name"))
// write to disk
df_unique.write(...)
哪一个应该提供更好的性能?在 approach(1) 中,整个数据帧可以按O(NlogN)
排序,其中N
是行数。在第二种方法中,可以按k/N (k/Nlogk/N)
排序,其中k
是唯一 id 的数量(假设均匀分布)。更重要的是,它可以在分区内本地排序。
对于第二种情况下的初学者,数据无法在分区内本地排序。由于我们假设在filter
之后应用sort
的数据帧的所有行中随机分布是分布式操作,并且与第一种情况相同,需要完全洗牌。要使其本地化,您必须coalesce
到单个分区,但这再次是一个完整的广泛操作。
第二个问题是假设第一种情况是O(NlogN)。即使你假设Spark在O(NlogN)中排序,你仍然会有O(KN)来应用所有过滤器。在实践中,Spark 使用类似于存储桶排序的方法进行排序:
- 首先,它对数据构建
RangePartitioner
- 使用分区程序对数据进行重新分区
- 在每个分区内本地排序
我非分布式应用程序,人们可以争辩说这个 O(N + K) 平均而言,但它显然忽略了应用程序的分布式性质。将数据从一个分区移动到另一个分区与在内存中移动数据不同,并且有其自身的复杂性,这进一步取决于配置和群集拓扑。
最后DataFrame
API 是声明性的。这意味着你写的不一定是你得到的。
抛开所有细节,我们可以简化这个问题,并选择两种基本策略之一
-
首先排序(范围分区)
- 按名称划分分区数据的范围
- 对于每个分区:
- 按名称对分区内的数据进行排序
- 为每个 ID 使用单独的文件写入数据
- (可选)- 为每个 ID 合并部分文件
-
首先按 id 分区
- (范围)按 ID 对数据进行分区:
- 对于每个分区:
- 按名称对分区内的数据进行排序
- 为每个 ID 使用单独的文件写入数据
如您所见,这基本上是相同的算法。