pyspark 应用数据帧窗口函数与过滤器



我有一个数据集,列:id,timestamp,x,y

id  timestamp   x      y 
0   1443489380  100    1
0   1443489390  200    0
0   1443489400  300    0
0   1443489410  400    1

我定义了一个窗口规范:w = Window.partitionBy("id").orderBy("timestamp")

我想做这样的事情。创建一个新列,将当前行的 x 与下一行的 x 相加。

如果总和>= 500,则设置新列 = 大,否则小。

df = df.withColumn("newCol", 
when(df.x + lag(df.x,-1).over(w) >= 500 , "BIG")
.otherwise("SMALL") )

但是,我想在执行此操作之前过滤数据,而不会影响原始 df

[只有 y = 1 的行才会应用上面的代码]

因此,将应用上述代码的数据只有这 2 行。

0 , 1443489380, 100 , 1

0 , 1443489410, 400 , 1

我已经这样做了,但太糟糕了。

df2 = df.filter(df.y == 1)
df2 = df2.withColumn("newCol", 
when(df.x + lag(df.x,-1).over(w) >= 500 , "BIG")
.otherwise("SMALL") )
df = df.join(df2, ["id","timestamp"], "outer")

我想做这样的事情,但这是不可能的,因为它会导致属性错误:"数据帧"对象没有属性"when">

df = df.withColumn("newCol", df.filter(df.y == 1)
.when(df.x + lag(df.x,-1).over(w) >= 500 , "BIG")
.otherwise("SMALL") )

总之,我只想对 y = 1 的行做一个临时过滤器

,然后用下一个 x 和 x 求和 x。

您的代码工作正常,我认为您 din 导入函数模块。尝试了您的代码,

>>> from pyspark.sql import functions as F
>>> df2 = df2.withColumn("newCol", 
F.when((df.x + F.lag(df.x,-1).over(w))>= 500 , "BIG")
.otherwise("SMALL") )
>>> df2.show()
+---+----------+---+---+------+
| id| timestamp|  x|  y|newCol|
+---+----------+---+---+------+
|  0|1443489380|100|  1|   BIG|
|  0|1443489410|400|  1| SMALL|
+---+----------+---+---+------+

编辑: 尝试过根据"id","y"列更改窗口分区,

>>> w = Window.partitionBy("id","y").orderBy("timestamp")
>>> df.select("*", F.when(df.y == 1,F.when((df.x+F.lag("x",-1).over(w)) >=500,'BIG').otherwise('SMALL')).otherwise(None).alias('new_col')).show()
+---+----------+---+---+-------+
| id| timestamp|  x|  y|new_col|
+---+----------+---+---+-------+
|  0|1443489380|100|  1|    BIG|
|  0|1443489410|400|  1|  SMALL|
|  0|1443489390|200|  0|   null|
|  0|1443489400|300|  0|   null|
+---+----------+---+---+-------+

最新更新