我是Spark上的初学者,我尝试提出请求允许我检索访问量最多的网页。
我的请求是以下
mostPopularWebPageDF = logDF.groupBy("webPage").agg(functions.count("webPage").alias("cntWebPage")).agg(functions.max("cntWebPage")).show()
使用此请求,我仅检索具有最大计数的数据框
类似的东西:
webPage max(cntWebPage)
google.com 2
我该如何解决我的问题?
非常感谢。
在pyspark sql中:
logDF.registerTempTable("logDF")
mostPopularWebPageDF = sqlContext.sql("""select webPage, cntWebPage from (
select webPage, count(*) as cntWebPage, max(count(*)) over () as maxcnt
from logDF
group by webPage) as tmp
where tmp.cntWebPage = tmp.maxcnt""")
也许我可以使其更干净,但它起作用。我将尝试优化它。
我的结果:
webPage cntWebPage
google.com 2
数据集:
webPage usersid
google.com 1
google.com 3
bing.com 10
说明:正常计数是通过分组 计数(*)函数完成的。所有这些计数的最大值是通过窗口函数计算的,因此对于上面的数据集,即时数据框/不删除MaxCount列/IS:
webPage count maxCount
google.com 2 2
bing.com 1 2
然后,我们选择的行等于MaxCount
编辑:我已删除了DSL版本 - 它不支持()上的窗口,并且订购正在改变结果。对不起这个错误。SQL版本是正确的