用例是捕获工作站和零件相同的流式传感器条目之间的时间差,以与公差进行比较,并在超出范围时可能触发警报。我目前正在将字段解析为一个数据帧,并将其注册为一个表,以使用LAG函数执行SQL查询。
events = rawFilter.map(lambda x: x.split("|")).map(lambda x: (x[0], x[1], x[2]))
eventSchema = StructType(
[StructField("station", StringType(), False),
StructField("part", StringType(), False),
StructField("event", TimestampType(), False)])
eventDF = sqlContext.createDataFrame(events,eventSchema)
eventDF.registerTempTable("events_table")
%sql select station, part, event, prev_event,
cast(event as double) - cast(prev_event as double) as CycleTime
from (select station, part, event,
LAG(event) over (Partition BY station, part Order BY event) as Prev_Event
from events_table) x limit 10
Example Streaming Sensor Data:
station1|part1|<timestamp>
station2|part2|<timestamp>
station3|part3|<timestamp>
station1|part1|<timestamp>
station1|part1|<timestamp>
station1|part1|<timestamp>
station3|part3|<timestamp>
station1|part1|<timestamp>
我试图理解的是,我如何在数据帧中完成窗口函数,这样得到的表就已经计算出了时间差?
这个问题的第2部分是了解当零件发生变化时如何处理。在这种情况下,不应计算或停止CycleTime;然而,同一个站点的两个不同部分之间的时间差是另一种称为转换的计算。我不知道Spark Streaming是如何做到这一点的,因为在Part更改之前,窗口可能会延长几天。所以我想把数据推到Hbase或其他东西中来计算ChangeOver。
DataFrames
上的窗口定义严格遵循SQL约定,partitionBy
、orderBy
、rangeBetween
和rowsBetween
方法对应于等效的SQL子句。
from pyspark.sql.functions import col, lag, unix_timestamp
from pyspark.sql.window import Window
rawDF = sc.parallelize([
("station1", "part1", "2015-01-03 00:11:02"),
("station2", "part2", "2015-02-00 10:20:10"),
("station3", "part3", "2015-03-02 00:30:00"),
("station1", "part1", "2015-05-00 01:07:00"),
("station1", "part1", "2015-01-13 05:16:10"),
("station1", "part1", "2015-11-20 10:22:40"),
("station3", "part3", "2015-09-04 03:15:22"),
("station1", "part1", "2015-03-05 00:41:33")
]).toDF(["station", "part", "event"])
eventDF = rawDF.withColumn("event", unix_timestamp(col("event")))
w = Window.partitionBy(col("station")).orderBy(col("event"))
(eventDF
.withColumn("prev_event", lag(col("event")).over(w))
.withColumn("cycle_time", col("event") - col("prev_event")))