我有PySpark数据帧:
user_id | |||||||
---|---|---|---|---|---|---|---|
1 | 1 | 2 | 2 |
只能对具有pivot
属性(方法或属性(的对象执行.pivot
。您尝试执行df.pivot
,所以只有当df
具有这样的属性时,它才会起作用。您可以在这里检查df的所有属性(它是pyspark.sql.DataFrame
类的对象(。您可以在那里看到许多属性,但没有一个属性被称为pivot
。这就是为什么会出现属性错误的原因。
pivot
是pyspark.sql.GroupedData
对象的一种方法。这意味着,为了使用它,您必须以某种方式从pyspark.sql.DataFrame
对象创建pyspark.sql.GroupedData
对象。在您的情况下,使用.groupBy()
:
df.groupBy("user_id").pivot("item_id")
这将创建另一个pyspark.sql.GroupedData
对象。为了从中生成一个数据帧,您需要使用GroupedData
类的一个方法agg
是您需要的方法。在它内部,您必须提供Spark的聚合函数,该函数将用于所有分组元素(例如sum
、first
等(
df.groupBy("user_id").pivot("item_id").agg(F.sum("watched_pct"))
完整示例:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[(1, 1, '2021-05-11', 4250, 72),
(1, 2, '2021-05-11', 80, 99),
(2, 3, '2021-05-11', 1000, 80),
(2, 4, '2021-05-11', 5000, 40)],
['user_id', 'item_id', 'last_watch_dt', 'total_dur', 'watched_pct'])
df = df.groupBy("user_id").pivot("item_id").agg(F.sum("watched_pct"))
df.show()
# +-------+----+----+----+----+
# |user_id| 1| 2| 3| 4|
# +-------+----+----+----+----+
# | 1| 72| 99|null|null|
# | 2|null|null| 80| 40|
# +-------+----+----+----+----+
如果要用0
替换null,请使用pyspark.sql.DataFrame
类的fillna
。
df = df.fillna(0)
df.show()
# +-------+---+---+---+---+
# |user_id| 1| 2| 3| 4|
# +-------+---+---+---+---+
# | 1| 72| 99| 0| 0|
# | 2| 0| 0| 80| 40|
# +-------+---+---+---+---+