根据Spark数据框中另一列的值找到一列的最大值



我有以下数据框架,其中第三列是"clickkedairbnb "第四行是"rank"

|    Tom|             Paris|             |                 1|
|    Tom|            Mumbai|             |                 1|
|    Tom|          Stockolm|             |                 1|
|    Tom|              Oslo|      airbnb1|                 2|
|    Tom|             Tokyo|      airbnb1|                 2|
|    Tom|         Bangalore|      airbnb1|                 2|
|    Sam|             Seoul|     airbnb11|                 1|
|    Sam|             Tokyo|     airbnb11|                 1|
|    Sam|            Berlin|     airbnb12|                 2|
|    Sam|         Bangalore|     airbnb12|                 2|
|    Sam|         Singapore|     airbnb12|                 2|
|    Sam|              Oslo|      airbnb2|                 3|
|    Sam|         Amsterdam|      airbnb2|                 3|
|    Sam|         Bangalore|      airbnb2|                 3|

我想返回那些列"点击airbnb "包含空值和列"排名"上的最大值。小于2。这是我尝试的,但不工作(抱怨对列的无效操作)

val result1and2 = result.where(col("clickedAirbnb").contains("")
                            && max(col("rank")) <= 2)

是否有一种方法来计算列上的最大值?

更新:
为了备份一点,结果df是这样计算的:

val window = Window.partitionBy(df1("User")).orderBy(df1("clickedAirbnb"))
val result = df1.withColumn("clickedDestHotRank", dense_rank().over(window))

现在返回clickedAirbnb(第三列)为空且rank(第四列)的最大值不超过2的用户

val result2 = result.where(col("clickedAirbnb").contains("")
                        && (max(col("rank")) <=2))

看来你想:

  • 只对中没有值的内容进行分组
  • 当且仅当max(rank)<=2

像这样,也许:

//those that have no value in clickedAirbnb
val resultTmp = result.where(col("clickedAirbnb")==="")
//is its max("rank")<=2 ?  
val b = resultTmp.select(max("rank")<=2).first().getBoolean(0)  
if(b){
  resultTmp.show()
}

希望我理解得很好。

我希望这样:

>>> from pyspark.sql.functions import *
>>> sc = spark.sparkContext
>>> rdd = sc.parallelize([
    ['Tom','Paris','',1],
    ['Tom','Mumbai','',1],
    ['Tom','Stockolm','',1],
    ['Tom','Oslo','airbnb1',2],
    ['Tom','Tokyo','airbnb1',2],
    ['Tom','Bangalore','airbnb1',2],
    ['Sam','Seoul','airbnb11',1],
    ['Sam','Tokyo','airbnb11',1],
    ['Sam','Berlin','airbnb12',2],
    ['Sam','Bangalore','airbnb12',2],
    ['Sam','Singapore','airbnb12',2],
    ['Sam','Oslo','airbnb2',3],
    ['Sam','Amsterdam','airbnb2',3],
    ['Sam','Bangalore','airbnb2',3]
])
>>> df = rdd.toDF(['name','city','clickedAirbnb', 'rank'])
>>> df.show()
+----+---------+-------------+----+
|name|     city|clickedAirbnb|rank|
+----+---------+-------------+----+
| Tom|    Paris|             |   1|
| Tom|   Mumbai|             |   1|
| Tom| Stockolm|             |   1|
| Tom|     Oslo|      airbnb1|   2|
| Tom|    Tokyo|      airbnb1|   2|
| Tom|Bangalore|      airbnb1|   2|
| Sam|    Seoul|     airbnb11|   1|
| Sam|    Tokyo|     airbnb11|   1|
| Sam|   Berlin|     airbnb12|   2|
| Sam|Bangalore|     airbnb12|   2|
| Sam|Singapore|     airbnb12|   2|
| Sam|     Oslo|      airbnb2|   3|
| Sam|Amsterdam|      airbnb2|   3|
| Sam|Bangalore|      airbnb2|   3|
+----+---------+-------------+----+
>>> df.where(col("clickedAirbnb") == "").where(col("rank") <= 2).show()
+----+--------+-------------+----+
|name|    city|clickedAirbnb|rank|
+----+--------+-------------+----+
| Tom|   Paris|             |   1|
| Tom|  Mumbai|             |   1|
| Tom|Stockolm|             |   1|
+----+--------+-------------+----+

我将尝试描述一个更一般的情况。在下面的例子中,我们想要选择"value"基于Max of "order"

val df = Seq(("v1", 3), ("v2", 3), ("v3", 2)).toDF("value", "order")
df.show()
// +-----+-----+
// |value|order|
// +-----+-----+
// |   v1|    3|
// |   v2|    3|
// |   v3|    2|
// +-----+-----+

2种方式:

  • 如果你每笔订单只有一个值,或者如果你不在乎哪一个值,只要它对应于"订单"的最大值;

    // Spark 3.3+
    df.groupBy().agg(max_by("value", "order")).show()
    // +--------------------+
    // |max_by(value, order)|
    // +--------------------+
    // |                  v2|
    // +--------------------+
    // Spark 3.0+
    df.groupBy().agg(expr("max_by(value, order)")).show()
    
    import org.apache.spark.sql.expressions.Window
    df.withColumn("_rn", row_number().over(Window.orderBy(desc("order"))))
      .filter("_rn=1").select("value").show()
    // +-----+
    // |value|
    // +-----+
    // |   v1|
    // +-----+
    
  • 如果你有更多的值/你需要一致性/进一步调试的能力,使用这个:

    import org.apache.spark.sql.expressions.Window
    df.withColumn("_rank", rank().over(Window.orderBy(desc("order"))))
      .filter("_rank=1").select("value").show()
    // +-----+
    // |value|
    // +-----+
    // |   v1|
    // |   v2|
    // +-----+
    

注册一个临时表,然后写入所需的查询

your_data_frame.registerTempTable("table1");
res = sqlCtx.sql("select * where clickedAirbnb = "" and max(rank)<=2 from table1) ;

相关内容

  • 没有找到相关文章