在 Spark 数据帧中查找每组的最大行数



我正在尝试使用Spark数据帧而不是RDD,因为它们似乎比RDD更高级,并且倾向于生成更具可读性的代码。

在一个 14 节点的 Google Dataproc 集群中,我有大约 6 万个名字,它们被两个不同的系统翻译成 id:sasb 。每个Row包含nameid_said_sb。我的目标是生成从id_said_sb的映射,以便对于每个id_sa,相应的id_sb是附加到id_sa的所有名称中最常见的id。

让我们尝试用一个例子来澄清。如果我有以下行:

[Row(name='n1', id_sa='a1', id_sb='b1'),
 Row(name='n2', id_sa='a1', id_sb='b2'),
 Row(name='n3', id_sa='a1', id_sb='b2'),
 Row(name='n4', id_sa='a2', id_sb='b2')]

我的目标是生成从a1b2的映射。事实上,与a1相关的名称是n1n2n3,它们分别映射到b1b2b2,所以b2是与a1相关的名称中最常见的映射。同样,a2将被映射到 b2 .假设总会有赢家是可以的:没有必要断绝关系。

我希望我可以在我的数据帧上使用groupBy(df.id_sa),但我不知道下一步该怎么做。我希望有一个聚合,最终可以产生以下行:

[Row(id_sa=a1, max_id_sb=b2),
 Row(id_sa=a2, max_id_sb=b2)]

但也许我试图使用错误的工具,我应该回到使用RDD。

使用 join (在平局的情况下,这将导致组中出现多行(:

import pyspark.sql.functions as F
from pyspark.sql.functions import count, col 
cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")
cnts.join(maxs, 
  (col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))

使用窗口函数(将删除连接(:

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())
(cnts
  .withColumn("rn", row_number().over(w))
  .where(col("rn") == 1)
  .select("id_sa", "id_sb"))

使用struct排序:

from pyspark.sql.functions import struct
(cnts
  .groupBy("id_sa")
  .agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
  .select(col("id_sa"), col("max.id_sb")))

另请参阅如何选择每个组的第一行?

我认为您可能正在寻找的是窗口函数:http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Window

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

这是 Scala 中的一个示例(我现在没有可用的带有 Hive 的 Spark Shell,所以我无法测试代码,但我认为它应该可以工作(:

case class MyRow(name: String, id_sa: String, id_sb: String)
val myDF = sc.parallelize(Array(
    MyRow("n1", "a1", "b1"),
    MyRow("n2", "a1", "b2"),
    MyRow("n3", "a1", "b2"),
    MyRow("n1", "a2", "b2")
)).toDF("name", "id_sa", "id_sb")
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc)
myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb")

可能有更有效的方法可以使用 Window 函数实现相同的结果,但我希望这为您指明了正确的方向。

in Spark 3.2+:

dd1=df1.pandas_api()
col1=dd1.groupby("id_sa")['id_sb'].transform(lambda ss:ss.count())
dd1['col1']=col1
dd1.groupby("id_sa").apply(lambda dd:dd.sort_values("col1",ascending=False).head(1)).reset_index(drop=True).drop("col1",axis=1)

相关内容

  • 没有找到相关文章

最新更新