我有以下数据框架,其中第三列是"clickkedairbnb "第四行是"rank"
| Tom| Paris| | 1|
| Tom| Mumbai| | 1|
| Tom| Stockolm| | 1|
| Tom| Oslo| airbnb1| 2|
| Tom| Tokyo| airbnb1| 2|
| Tom| Bangalore| airbnb1| 2|
| Sam| Seoul| airbnb11| 1|
| Sam| Tokyo| airbnb11| 1|
| Sam| Berlin| airbnb12| 2|
| Sam| Bangalore| airbnb12| 2|
| Sam| Singapore| airbnb12| 2|
| Sam| Oslo| airbnb2| 3|
| Sam| Amsterdam| airbnb2| 3|
| Sam| Bangalore| airbnb2| 3|
我想返回那些列"点击airbnb "包含空值和列"排名"上的最大值。小于2。这是我尝试的,但不工作(抱怨对列的无效操作)
val result1and2 = result.where(col("clickedAirbnb").contains("")
&& max(col("rank")) <= 2)
是否有一种方法来计算列上的最大值?
更新:
为了备份一点,结果df是这样计算的:
val window = Window.partitionBy(df1("User")).orderBy(df1("clickedAirbnb"))
val result = df1.withColumn("clickedDestHotRank", dense_rank().over(window))
现在返回clickedAirbnb(第三列)为空且rank(第四列)的最大值不超过2的用户
val result2 = result.where(col("clickedAirbnb").contains("")
&& (max(col("rank")) <=2))
看来你想:
- 只对中没有值的内容进行分组
- 当且仅当max(rank)<=2
像这样,也许:
//those that have no value in clickedAirbnb
val resultTmp = result.where(col("clickedAirbnb")==="")
//is its max("rank")<=2 ?
val b = resultTmp.select(max("rank")<=2).first().getBoolean(0)
if(b){
resultTmp.show()
}
希望我理解得很好。
我希望这样:
>>> from pyspark.sql.functions import *
>>> sc = spark.sparkContext
>>> rdd = sc.parallelize([
['Tom','Paris','',1],
['Tom','Mumbai','',1],
['Tom','Stockolm','',1],
['Tom','Oslo','airbnb1',2],
['Tom','Tokyo','airbnb1',2],
['Tom','Bangalore','airbnb1',2],
['Sam','Seoul','airbnb11',1],
['Sam','Tokyo','airbnb11',1],
['Sam','Berlin','airbnb12',2],
['Sam','Bangalore','airbnb12',2],
['Sam','Singapore','airbnb12',2],
['Sam','Oslo','airbnb2',3],
['Sam','Amsterdam','airbnb2',3],
['Sam','Bangalore','airbnb2',3]
])
>>> df = rdd.toDF(['name','city','clickedAirbnb', 'rank'])
>>> df.show()
+----+---------+-------------+----+
|name| city|clickedAirbnb|rank|
+----+---------+-------------+----+
| Tom| Paris| | 1|
| Tom| Mumbai| | 1|
| Tom| Stockolm| | 1|
| Tom| Oslo| airbnb1| 2|
| Tom| Tokyo| airbnb1| 2|
| Tom|Bangalore| airbnb1| 2|
| Sam| Seoul| airbnb11| 1|
| Sam| Tokyo| airbnb11| 1|
| Sam| Berlin| airbnb12| 2|
| Sam|Bangalore| airbnb12| 2|
| Sam|Singapore| airbnb12| 2|
| Sam| Oslo| airbnb2| 3|
| Sam|Amsterdam| airbnb2| 3|
| Sam|Bangalore| airbnb2| 3|
+----+---------+-------------+----+
>>> df.where(col("clickedAirbnb") == "").where(col("rank") <= 2).show()
+----+--------+-------------+----+
|name| city|clickedAirbnb|rank|
+----+--------+-------------+----+
| Tom| Paris| | 1|
| Tom| Mumbai| | 1|
| Tom|Stockolm| | 1|
+----+--------+-------------+----+
我将尝试描述一个更一般的情况。在下面的例子中,我们想要选择"value"基于Max of "order"
val df = Seq(("v1", 3), ("v2", 3), ("v3", 2)).toDF("value", "order")
df.show()
// +-----+-----+
// |value|order|
// +-----+-----+
// | v1| 3|
// | v2| 3|
// | v3| 2|
// +-----+-----+
2种方式:
如果你每笔订单只有一个值,或者如果你不在乎哪一个值,只要它对应于"订单"的最大值;
// Spark 3.3+ df.groupBy().agg(max_by("value", "order")).show() // +--------------------+ // |max_by(value, order)| // +--------------------+ // | v2| // +--------------------+ // Spark 3.0+ df.groupBy().agg(expr("max_by(value, order)")).show()
import org.apache.spark.sql.expressions.Window df.withColumn("_rn", row_number().over(Window.orderBy(desc("order")))) .filter("_rn=1").select("value").show() // +-----+ // |value| // +-----+ // | v1| // +-----+
如果你有更多的值/你需要一致性/进一步调试的能力,使用这个:
import org.apache.spark.sql.expressions.Window df.withColumn("_rank", rank().over(Window.orderBy(desc("order")))) .filter("_rank=1").select("value").show() // +-----+ // |value| // +-----+ // | v1| // | v2| // +-----+
注册一个临时表,然后写入所需的查询
your_data_frame.registerTempTable("table1");
res = sqlCtx.sql("select * where clickedAirbnb = "" and max(rank)<=2 from table1) ;