Hive和Spark窗口功能的数据混洗



对已经在同一节点上的数据使用配置单元窗口函数时,是否会发生数据混洗?

具体在下面的例子中,在使用窗口函数之前,"City"已经使用Spark repartition()函数对数据进行了重新分区,这应该确保城市"A"的所有数据共同定位在同一节点上(假设一个城市的数据可以适应一个节点)。

df = sqlContext.createDataFrame(
    [('A', '1', 2009, "data1"),
     ('A', '1', 2015, "data2"),
     ('A', '22', 2015, "data3"),
     ('A', '22', 2016, "data4"),
     ('BB', '333', 2014, "data5"), 
     ('BB', '333', 2012, "data6"), 
     ('BB', '333', 2016, "data7")
    ],
    ("City", "Person","year", "data"))
df = df.repartition(2, 'City')
df.show()
# +----+------+----+-----+
# |City|Person|year| data|
# +----+------+----+-----+
# |  BB|   333|2012|data6|
# |  BB|   333|2014|data5|
# |  BB|   333|2016|data7|
# |   A|    22|2016|data4|
# |   A|    22|2015|data3|
# |   A|     1|2009|data1|
# |   A|     1|2015|data2|
# +----+------+----+-----+

然后,我必须用"Person"对窗口函数进行分区,这不是Spark repartition()中的分区键,如下所示。

df.registerTempTable('example')
sqlStr = """
    select *,
        row_number() over (partition by Person order by year desc) ranking
    from example
"""
sqlContext.sql(sqlStr).show(100)
# +----+------+----+-----+-------+
# |City|Person|year| data|ranking|
# +----+------+----+-----+-------+
# |  BB|   333|2016|data7|      1|
# |  BB|   333|2014|data5|      2|
# |  BB|   333|2012|data6|      3|
# |   A|     1|2015|data2|      1|
# |   A|     1|2009|data1|      2|
# |   A|    22|2016|data4|      1|
# |   A|    22|2015|data3|      2|
# +----+------+----+-----+-------+

以下是我的问题:

  1. Spark的"重新分区"和Hive的"分区依据"之间有什么关系或区别吗?在引擎盖下,它们在Spark上被翻译成同一个东西了吗?

  2. 我想检查一下我以下的理解是否正确。即使所有已经在同一节点上的数据,如果我调用Spark df.repartition('A_key_different_from_current_partition_key'),数据也会被混洗到多个节点,而不是在同一个节点上。

顺便说一句,我还想知道用Spark窗口函数实现示例Hive查询是否简单。

窗口函数中的partition by子句和repartition都使用相同的TungstenExchange机制执行。当你分析执行计划时,你会看到这一点:

sqlContext.sql(sqlStr).explain()
## == Physical Plan ==
## Window [City#0,Person#1,year#2L,data#3], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber() windowspecdefinition(Person#1,year#2L DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS ranking#5], [Person#1], [year#2L DESC]
## +- Sort [Person#1 ASC,year#2L DESC], false, 0
##    +- TungstenExchange hashpartitioning(Person#1,200), None <- PARTITION BY
##       +- Project [City#0,Person#1,year#2L,data#3]
##          +- TungstenExchange hashpartitioning(City#0,2), None <- REPARTITION
##             +- ConvertToUnsafe
##                +- Scan ExistingRDD[City#0,Person#1,year#2L,data#3]

关于第二个问题,你的假设是正确的。即使数据已经位于单个节点上,Spark也没有关于数据分布的先验知识,并且会再次对数据进行混洗。

最后,从某种角度来看,您的查询已经是一个Spark查询,或者使用普通Spark不可能执行该查询。

  • 这是一个Spark查询,因为DSL对应方将使用完全相同的机制

    from pyspark.sql.window import Window
    from pyspark.sql.functions import col, row_number
    w = Window.partitionBy("person").orderBy(col("year").desc())
    df.withColumn("ranking", row_number().over(w))
    
  • 使用普通的Spark是不可能执行的,因为从Spark 1.6开始,还没有窗口函数的本地实现。它在Spark 2.0中发生了变化。

最新更新