如何取消堆叠数据集(使用透视)



我在更大的堆叠数据集上尝试了1.6的新"枢轴"函数。它有 5,656,458 行,IndicatorCode列有 1344 个不同的代码。

这个想法是使用透视来"取消堆叠"(用熊猫术语(这个数据集,并为每个指标代码设置一列。

schema = StructType([ 
   StructField("CountryName", StringType(), True), 
   StructField("CountryCode", StringType(), True), 
   StructField("IndicatorName", StringType(), True), 
   StructField("IndicatorCode", StringType(), True), 
   StructField("Year", IntegerType(), True), 
   StructField("Value", DoubleType(), True)  
])
data = sqlContext.read.load('hdfs://localhost:9000/tmp/world-development-indicators/Indicators.csv', 
                            format='com.databricks.spark.csv', 
                            header='true', 
                            schema=schema)
data2 = indicators_csv.withColumn("IndicatorCode2", regexp_replace("indicatorCode", ".", "_"))
                      .select(["CountryCode", "IndicatorCode2", "Year", "Value"])
columns = [row.IndicatorCode2 for row in data2.select("IndicatorCode2").distinct().collect()]
data3 = data2.groupBy(["Year", "CountryCode"])
             .pivot("IndicatorCode2", columns)
             .max("Value")

虽然这成功返回,但data3.first()从未返回结果(我在 10 分钟后使用 3 个内核中断了我的独立

(。

我使用 RDDaggregateByKey 的方法效果很好,所以我不是在寻找有关如何做到这一点的解决方案,而是使用 DataFrame 进行透视是否也可以解决问题。

好吧,透视通常不是一个非常有效的操作,使用 API DataFrame您可以做太多事情。您可以尝试的一件事是repartition您的数据:

(data2
  .repartition("Year", "CountryCode")
  .groupBy("Year", "CountryCode")
  .pivot("IndicatorCode2", columns)
  .max("Value"))

甚至聚合:

from pyspark.sql.functions import max
(df
    .groupBy("Year", "CountryCode", "IndicatorCode")
    .agg(max("Value").alias("Value"))
    .groupBy("Year", "CountryCode")
    .pivot("IndicatorCode", columns)
    .max("Value"))

在申请pivot之前。两种解决方案背后的理念是相同的。与其移动大型扩展数据Rows不如移动狭窄的密集数据并在本地扩展。

> Spark 2.0 引入了 SPARK-13749 一种透视实现,对于大量透视列值来说速度更快。

在我的计算机上使用 Spark 2.1.0 进行测试,您的示例现在在 48 秒内运行。

相关内容

  • 没有找到相关文章

最新更新