提取ID和包含相同ID的两行之间的不同错误



我有一个Spark数据帧,它有两列("time_stamp"one_answers"message"(。

示例数据帧:

Time_stamp                   Message
2020-12-01 05:28:34:215      some text1 ID: 1
2020-12-01 05:28:40:210      some text2 error: A
2020-12-01 05:28:40:220      some text3 error: B
2020-12-01 05:28:41:203      some text4 error: A
2020-12-01 05:30:43:201      some text5 ID: 1
2020-12-01 05:32:50:215      some text6 ID: 2
2020-12-01 05:32:50:220      some text7 error: A
2020-12-01 05:48:51:220      some text8 error: C
2020-12-01 05:48:52:203      some text9 error: B
2020-12-01 05:51:53:201      some text10 ID: 2

我想制作另一个具有ID的数据帧,并且在包含相同ID的两行之间存在不同的错误。

预期输出:

示例表:

ID          Error
1           A
1           B
2           A
2           C
2           B

我尝试了以下代码。然而,它使用了Azure Databricks不支持的windows函数,并且代码需要很长时间才能执行。

import pyspark.sql.functions as F
from pyspark.sql.window import Window
df2 = df.withColumn(
'Time_stamp',
F.to_timestamp('Time_stamp', 'yyyy-MM-dd HH:mm:ss:SSS')
).withColumn(
'ID',
F.regexp_extract('Message', 'ID: ([a-zA-Z0-9]+)', 1)
).withColumn(
'ID',
F.last(F.when(F.col('ID') != '', F.col('ID')), True).over(Window.orderBy('Time_stamp'))
).filter(
F.col('message').rlike('error')
).withColumn(
'Message',
F.regexp_extract('Message', 'error: (.*)', 1)
).groupBy('ID').agg(
F.collect_set(F.array('Message', 'Time_stamp')).alias('Message')
).select(
'ID',
F.explode('Message').alias('Message')
).selectExpr(
'ID',
'Message[0] as error',
'Message[1] as Time_stamp'
).withColumn(
'rn',
F.row_number().over(Window.partitionBy('ID', 'error').orderBy('Time_stamp'))
).filter('rn = 1').orderBy('Time_stamp').select('ID', 'error')

有人能提供SQL解决方案吗?PySpark SQL在Azure数据块中得到了很好的支持。

没什么好说的,只是我认为pyspark看起来比spark SQL更好。。。

df.createOrReplaceTempView('df')
result = spark.sql("""
select ID, error
from (
select *, row_number() over (partition by ID, error order by Time_stamp) rn
from (
select ID, Message[0] error, Message[1] Time_stamp
from (
select ID, explode(Message) Message
from (
select ID, collect_set(array(Message, Time_stamp)) Message
from (
select Time_stamp, regexp_extract(Message, 'error: (.*)', 1) Message, ID 
from (
select Time_stamp, Message, last(case when ID != '' then ID end, true) over (order by Time_stamp) ID
from (
select to_timestamp(Time_stamp, 'yyyy-MM-dd HH:mm:ss:SSS') Time_stamp, Message, regexp_extract(Message, 'ID: ([a-zA-Z0-9]+)', 1) ID
from df
)
) where Message rlike 'error'
) group by ID
)
)
)
) where rn = 1 order by Time_stamp""")
result.show()
+---+-----+
| ID|error|
+---+-----+
|  1|    A|
|  1|    B|
|  2|    A|
|  2|    C|
|  2|    B|
+---+-----+

最新更新