表:
+---------+---------+---------+---------+
|id |path |error |message |
+---------+---------+---------+---------+
| 1 | a.a.a| true | "aaa" |
+---------+---------+---------+---------+
| 2 | a.a.a| true | "bbb" |
+---------+---------+---------+---------+
| 2 | a.a.a| true | "bbc" |
+---------+---------+---------+---------+
| 2 | a.a.b| false | "ccc" |
+---------+---------+---------+---------+
我有pySpark查询:
data.groupBy('id', 'path')
.agg(
sum(when(col('error') == 'true', 1).otherwise(0)).alias('count'),
).show()
如何使用第一个具有col('error') == 'true'
的示例元素添加新列?我想要一张有id, path, count, exampleItem
元素的表
函数first().alias('exampleItem')
起作用,但返回的元素不一定符合上述条件。
您可以定义一个列,该列仅在错误为true的情况下包含消息,否则使用when
函数为null。然后,ignorenulls
设置为true时使用的first
函数将提供您所期望的结果。
d = [(1, "a.a.a", True, "aaa"), (2, "a.a.a", True, "bbb"),
(2, "a.a.a", True, "bbc"), (2, "a.a.b", False, "ccc")]
data = spark.createDataFrame(d, ['id', 'path', 'error', 'message'])
data
.groupBy('id', 'path')
.agg(F.sum(F.when(F.col('error') == 'true', 1).otherwise(0)).alias('count'),
F.first(F.when(F.col('error'), F.col('message')), ignorenulls=True).alias('exampleItem'))
.show()
+---+-----+-----+-----------+
| id| path|count|exampleItem|
+---+-----+-----+-----------+
| 2|a.a.a| 2| bbb|
| 1|a.a.a| 1| aaa|
| 2|a.a.b| 0| null|
+---+-----+-----+-----------+
对于最后一行,空值是由于没有消息满足所需的要求。
您可以将first
与ignorenulls=True
选项和case when
语句一起使用,以获得带有error = true
的第一条消息。
from pyspark.sql import functions as F, Window
df2 = df.groupBy(
'id', 'path'
).agg(
F.sum(F.when(F.col('error') == 'true', 1).otherwise(0)).alias('count'),
F.first(
F.when(F.col('error') == 'true', F.col('message')),
ignorenulls=True
).alias('exampleItem')
).orderBy('id', 'path')
df2.show()
+---+-----+-----+-----------+
| id| path|count|exampleItem|
+---+-----+-----+-----------+
| 1|a.a.a| 1| aaa|
| 2|a.a.a| 2| bbb|
| 2|a.a.b| 0| null|
+---+-----+-----+-----------+
但是,请注意,您的示例数据帧中没有定义排序列,因此first
没有具体含义,只会返回一个带有error = true
的随机元素。假设应该有一个时间戳列,可以用来对每个id/path分区中的数据帧进行排序。