如何在Pyspark DataFrame中将GroupBy转换为DrideByKey



我已经编写了使用组和sum函数的pyspark代码。我感到表现受到小组的影响。相反,我想使用ReedyByKey。但是我是这个领域的新手。请在下面找到我的情况,

步骤1:读取蜂巢表通过sqlcontext加入查询数据,并存储在dataframe

step2:输入列的总数为15。那5个是关键字段,其余是数字值。

step3:与上述输入列一起,还需要从数字列得出更多列。几个具有默认值的列。

步骤4:我已经使用了组和总功能。如何使用MAP和REDADBYKEY选项使用Spark Way进行类似的逻辑。

from pyspark.sql.functions import col, when, lit, concat, round, sum
#sample data
df = sc.parallelize([(1, 2, 3, 4), (5, 6, 7, 8)]).toDF(["col1", "col2", "col3", "col4"])
#populate col5, col6, col7
col5 = when((col('col1') == 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col6 = when((col('col1') == 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col7 = col('col2')
df1 = df.withColumn("col5", col5).
    withColumn("col6", col6).
    withColumn("col7", col7)
#populate col8, col9, col10
col8 = when((col('col1') != 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col9 = when((col('col1') != 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col10= concat(col('col2'), lit("_NEW"))
df2 = df.withColumn("col5", col8).
    withColumn("col6", col9).
    withColumn("col7", col10)
#final dataframe
final_df = df1.union(df2)
final_df.show()
#groupBy calculation
#final_df.groupBy("col1", "col2", "col3", "col4").agg(sum("col5")).show()from pyspark.sql.functions import col, when, lit, concat, round, sum
#sample data
df = sc.parallelize([(1, 2, 3, 4), (5, 6, 7, 8)]).toDF(["col1", "col2", "col3", "col4"])
#populate col5, col6, col7
col5 = when((col('col1') == 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col6 = when((col('col1') == 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col7 = col('col2')
df1 = df.withColumn("col5", col5).
    withColumn("col6", col6).
    withColumn("col7", col7)
#populate col8, col9, col10
col8 = when((col('col1') != 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0)
col9 = when((col('col1') != 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0)
col10= concat(col('col2'), lit("_NEW"))
df2 = df.withColumn("col5", col8).
    withColumn("col6", col9).
    withColumn("col7", col10)
#final dataframe
final_df = df1.union(df2)
final_df.show()
#groupBy calculation
final_df.groupBy("col1", "col2", "col3", "col4").agg(sum("col5")........sum("coln")).show()

Spark SQL中没有reduceByKey

groupBy 聚合功能将与RDD.ReduceBoke几乎相同。Spark将自动选择是否应该类似于RDD.groupByKey(即用于collect_list)还是RDD.reduceByKey

dataset.groupby 聚合功能的性能应更好或等于rdd.ReduceBokey。Catalyst Optimizer会在后台进行聚合

相关内容

  • 没有找到相关文章

最新更新