Spark 1.5.2:数据帧的前后排序



我有一个数据帧df,其中的列id (long), name(String)随机分布在数据帧的所有行中。目标是将数据帧拆分为唯一值 id s,然后将sorted数据帧(wrt name字段)写出到磁盘。有两种方法可以执行此操作:

第一种方法是先对数据帧进行排序:

//Approach 1: Global sorting (Psuedo code)
val df_sorted = df.sort(col("name"))
val unique_ids = df.distinct().head().getLong(0)
// write out to disk 
for specificID in unique_ids
     df_unique = df.filter(col("id") = specificID))
     // write to disk
     df_unique.write(...)

第二种方法是在写入磁盘之前对数据帧进行排序

//Approach 2: Local sorting (Psuedo code)
val unique_ids = df.distinct().head().getLong(0)
// write out to disk 
for all specificID in unique_ids
     df_unique = df.filter(col("id") = specificID)).sort(col("name"))
     // write to disk
     df_unique.write(...)

哪一个应该提供更好的性能?在 approach(1) 中,整个数据帧可以按O(NlogN)排序,其中N是行数。在第二种方法中,可以按k/N (k/Nlogk/N)排序,其中k是唯一 id 的数量(假设均匀分布)。更重要的是,它可以在分区内本地排序。

对于第二种情况下的初学者,数据无法在分区内本地排序。由于我们假设在filter之后应用sort的数据帧的所有行中随机分布是分布式操作,并且与第一种情况相同,需要完全洗牌。要使其本地化,您必须coalesce到单个分区,但这再次是一个完整的广泛操作。

第二个问题是假设第一种情况是O(NlogN)。即使你假设Spark在O(NlogN)中排序,你仍然会有O(KN)来应用所有过滤器。在实践中,Spark 使用类似于存储桶排序的方法进行排序:

  • 首先,它对数据构建RangePartitioner
  • 使用分区程序对数据进行重新分区
  • 在每个分区内本地排序

我非分布式应用程序,人们可以争辩说这个 O(N + K) 平均而言,但它显然忽略了应用程序的分布式性质。将数据从一个分区移动到另一个分区与在内存中移动数据不同,并且有其自身的复杂性,这进一步取决于配置和群集拓扑。

最后DataFrame API 是声明性的。这意味着你写的不一定是你得到的。

抛开所有细节,我们可以简化这个问题,并选择两种基本策略之一

  1. 首先排序(范围分区)

    • 按名称划分分区数据的范围
    • 对于每个分区:
      • 按名称对分区内的数据进行排序
      • 为每个 ID 使用单独的文件写入数据
    • (可选)- 为每个 ID 合并部分文件
  2. 首先按 id 分区

    • (范围)按 ID 对数据进行分区:
    • 对于每个分区:
      • 按名称对分区内的数据进行排序
      • 为每个 ID 使用单独的文件写入数据

如您所见,这基本上是相同的算法。

相关内容

  • 没有找到相关文章

最新更新