我想使用 Spark 结构化流处理多行数据集。示例数据集如下
{"reqID":"id3", "time":1577085247000, "type":"start"}
{"reqID":"id3", "time":1577085250000, "type":"sysstart"}
{"reqID":"id3", "time":1577085256000, "type":"sysend"}
{"reqID":"id3", "time":1577085260000, "type":"end"}
{"reqID":"id4", "time":1577085263000, "type":"start"}
{"reqID":"id4", "time":1577085266000, "type":"sysstart"}
{"reqID":"id4", "time":1577085269000, "type":"sysend"}
{"reqID":"id4", "time":1577085278000, "type":"end"}
我想执行基于reqID
的end_time(time for type end) - start_time(type for type start)
等操作。
我尝试按reqID
分组,并且能够在聚合期间合并事件,但合并的事件ArrayType
,我无法执行所需的操作。
作为一种替代方法,我尝试过透视,但它不适用于流媒体。它仅适用于批处理。
这种情况的解决方案是什么?
您可以使用 where 和连接条件,选择您想要的棋子,加入它们并根据需要进行操作:
df.where($"type" === "start")
.drop("type")
.withColumnRenamed("time", "startTime")
.join(df.where($"type" === "end")
.drop("type")
.withColumnRenamed("time", "endTime"), "reqID")
.withColumn("result", $"endTime" - $"startTime")
输出
+-----+-------------+-------------+------+
|reqID| startTime| endTime|result|
+-----+-------------+-------------+------+
| id3|1577085247000|1577085260000| 13000|
| id4|1577085263000|1577085278000| 15000|
+-----+-------------+-------------+------+