我在更大的堆叠数据集上尝试了1.6的新"枢轴"函数。它有 5,656,458 行,IndicatorCode
列有 1344 个不同的代码。
这个想法是使用透视来"取消堆叠"(用熊猫术语(这个数据集,并为每个指标代码设置一列。
schema = StructType([
StructField("CountryName", StringType(), True),
StructField("CountryCode", StringType(), True),
StructField("IndicatorName", StringType(), True),
StructField("IndicatorCode", StringType(), True),
StructField("Year", IntegerType(), True),
StructField("Value", DoubleType(), True)
])
data = sqlContext.read.load('hdfs://localhost:9000/tmp/world-development-indicators/Indicators.csv',
format='com.databricks.spark.csv',
header='true',
schema=schema)
data2 = indicators_csv.withColumn("IndicatorCode2", regexp_replace("indicatorCode", ".", "_"))
.select(["CountryCode", "IndicatorCode2", "Year", "Value"])
columns = [row.IndicatorCode2 for row in data2.select("IndicatorCode2").distinct().collect()]
data3 = data2.groupBy(["Year", "CountryCode"])
.pivot("IndicatorCode2", columns)
.max("Value")
虽然这成功返回,但data3.first()
从未返回结果(我在 10 分钟后使用 3 个内核中断了我的独立
我使用 RDD
和 aggregateByKey
的方法效果很好,所以我不是在寻找有关如何做到这一点的解决方案,而是使用 DataFrame 进行透视是否也可以解决问题。
好吧,透视通常不是一个非常有效的操作,使用 API DataFrame
您可以做太多事情。您可以尝试的一件事是repartition
您的数据:
(data2
.repartition("Year", "CountryCode")
.groupBy("Year", "CountryCode")
.pivot("IndicatorCode2", columns)
.max("Value"))
甚至聚合:
from pyspark.sql.functions import max
(df
.groupBy("Year", "CountryCode", "IndicatorCode")
.agg(max("Value").alias("Value"))
.groupBy("Year", "CountryCode")
.pivot("IndicatorCode", columns)
.max("Value"))
在申请pivot
之前。两种解决方案背后的理念是相同的。与其移动大型扩展数据Rows
不如移动狭窄的密集数据并在本地扩展。
> Spark 2.0 引入了 SPARK-13749 一种透视实现,对于大量透视列值来说速度更快。
在我的计算机上使用 Spark 2.1.0 进行测试,您的示例现在在 48 秒内运行。