我有一个Spark数据帧,它有两列("time_stamp"one_answers"message"(。
示例数据帧:
Time_stamp Message
2020-12-01 05:28:34:215 some text1 ID: 1
2020-12-01 05:28:40:210 some text2 error: A
2020-12-01 05:28:40:220 some text3 error: B
2020-12-01 05:28:41:203 some text4 error: A
2020-12-01 05:30:43:201 some text5 ID: 1
2020-12-01 05:32:50:215 some text6 ID: 2
2020-12-01 05:32:50:220 some text7 error: A
2020-12-01 05:48:51:220 some text8 error: C
2020-12-01 05:48:52:203 some text9 error: B
2020-12-01 05:51:53:201 some text10 ID: 2
我想制作另一个具有ID的数据帧,并且在包含相同ID的两行之间存在不同的错误。
预期输出:
示例表:
ID Error
1 A
1 B
2 A
2 C
2 B
我尝试了以下代码。然而,它使用了Azure Databricks不支持的windows函数,并且代码需要很长时间才能执行。
import pyspark.sql.functions as F
from pyspark.sql.window import Window
df2 = df.withColumn(
'Time_stamp',
F.to_timestamp('Time_stamp', 'yyyy-MM-dd HH:mm:ss:SSS')
).withColumn(
'ID',
F.regexp_extract('Message', 'ID: ([a-zA-Z0-9]+)', 1)
).withColumn(
'ID',
F.last(F.when(F.col('ID') != '', F.col('ID')), True).over(Window.orderBy('Time_stamp'))
).filter(
F.col('message').rlike('error')
).withColumn(
'Message',
F.regexp_extract('Message', 'error: (.*)', 1)
).groupBy('ID').agg(
F.collect_set(F.array('Message', 'Time_stamp')).alias('Message')
).select(
'ID',
F.explode('Message').alias('Message')
).selectExpr(
'ID',
'Message[0] as error',
'Message[1] as Time_stamp'
).withColumn(
'rn',
F.row_number().over(Window.partitionBy('ID', 'error').orderBy('Time_stamp'))
).filter('rn = 1').orderBy('Time_stamp').select('ID', 'error')
有人能提供SQL解决方案吗?PySpark SQL在Azure数据块中得到了很好的支持。
没什么好说的,只是我认为pyspark看起来比spark SQL更好。。。
df.createOrReplaceTempView('df')
result = spark.sql("""
select ID, error
from (
select *, row_number() over (partition by ID, error order by Time_stamp) rn
from (
select ID, Message[0] error, Message[1] Time_stamp
from (
select ID, explode(Message) Message
from (
select ID, collect_set(array(Message, Time_stamp)) Message
from (
select Time_stamp, regexp_extract(Message, 'error: (.*)', 1) Message, ID
from (
select Time_stamp, Message, last(case when ID != '' then ID end, true) over (order by Time_stamp) ID
from (
select to_timestamp(Time_stamp, 'yyyy-MM-dd HH:mm:ss:SSS') Time_stamp, Message, regexp_extract(Message, 'ID: ([a-zA-Z0-9]+)', 1) ID
from df
)
) where Message rlike 'error'
) group by ID
)
)
)
) where rn = 1 order by Time_stamp""")
result.show()
+---+-----+
| ID|error|
+---+-----+
| 1| A|
| 1| B|
| 2| A|
| 2| C|
| 2| B|
+---+-----+