我有一些类似的数据
data = [("1","1"), ("1","1"), ("1","1"), ("2","1"), ("2","1"), ("3","1"), ("3","1"), ("4","1"),]
df =spark.createDataFrame(data=data,schema=["id","imp"])
df.createOrReplaceTempView("df")
+---+---+
| id|imp|
+---+---+
| 1| 1|
| 1| 1|
| 1| 1|
| 2| 1|
| 2| 1|
| 3| 1|
| 3| 1|
| 4| 1|
+---+---+
我想要按ID分组的ID计数,它是运行总和和总和。这是我正在使用的代码
query = """
select id,
count(id) as count,
sum(count(id)) over (order by count(id) desc) as running_sum,
sum(count(id)) over () as total_sum
from df
group by id
order by count desc
"""
spark.sql(query).show()
+---+-----+-----------+---------+
| id|count|running_sum|total_sum|
+---+-----+-----------+---------+
| 1| 3| 3| 8|
| 2| 2| 7| 8|
| 3| 2| 7| 8|
| 4| 1| 8| 8|
+---+-----+-----------+---------+
问题出在running_sum
列上。出于某种原因,它在求和时自动将计数2分组,并显示ID 2和3的7。
这是我期待的结果
+---+-----+-----------+---------+
| id|count|running_sum|total_sum|
+---+-----+-----------+---------+
| 1| 3| 3| 8|
| 2| 2| 5| 8|
| 3| 2| 7| 8|
| 4| 1| 8| 8|
+---+-----+-----------+---------+
您应该在外部查询中进行运行求和。
spark.sql('''
select *,
sum(cnt) over (order by id rows between unbounded preceding and current row) as run_sum,
sum(cnt) over (partition by '1') as tot_sum
from (
select id, count(id) as cnt
from data_tbl
group by id)
''').
show()
# +---+---+-------+-------+
# | id|cnt|run_sum|tot_sum|
# +---+---+-------+-------+
# | 1| 3| 3| 8|
# | 2| 2| 5| 8|
# | 3| 2| 7| 8|
# | 4| 1| 8| 8|
# +---+---+-------+-------+
使用数据帧API
data_sdf.
groupBy('id').
agg(func.count('id').alias('cnt')).
withColumn('run_sum',
func.sum('cnt').over(wd.partitionBy().orderBy('id').rowsBetween(-sys.maxsize, 0))
).
withColumn('tot_sum', func.sum('cnt').over(wd.partitionBy())).
show()
# +---+---+-------+-------+
# | id|cnt|run_sum|tot_sum|
# +---+---+-------+-------+
# | 1| 3| 3| 8|
# | 2| 2| 5| 8|
# | 3| 2| 7| 8|
# | 4| 1| 8| 8|
# +---+---+-------+-------+