获取在流分析中接收一系列消息的时间范围



我正在流式传输包含 posix/epoch 时间字段的事件消息。我正在尝试计算我在哪个时间范围内从设备收到了一系列消息。

让我们假设以下(简化的(输入:

[
{ "deviceid":"device01", "epochtime":1500975613660 },
{ "deviceid":"device01", "epochtime":1500975640194 },
{ "deviceid":"device01", "epochtime":1500975649627 },
{ "deviceid":"device01", "epochtime":1500994473225 },
{ "deviceid":"device01", "epochtime":1500994486725 }
]

我的计算结果应该是每个设备 ID 的 {deviceid, start, end} 之类的消息。我假设一个新的时间范围开始,如果两个事件之间的时间间隔超过一小时。在我的示例中,这将导致两次传输:

[
{"deviceid":"device01", "start":1500975613660, "end"=1500975649627},
{"deviceid":"device01", "start":500994473225, "end"=1500994486725}
]

我可以根据文档 https://msdn.microsoft.com/en-us/library/azure/mt573293.aspx 中的示例 2 转换纪元时间。但是,我不能在子查询中将转换后的时间戳与 LAG 函数一起使用。上一个时间的所有值在输出中都是空的。

WITH step1 AS (
SELECT
[deviceid] AS deviceId,
System.Timestamp AS ts,
LAG([ts]) OVER (LIMIT DURATION(hour, 24)) as previousTime
FROM 
input TIMESTAMP BY DATEADD(millisecond, epochtime, '1970-01-01T00:00:00Z') 
)

我不确定如何执行计算以及最佳方法是什么。我需要弄清楚事件系列的开始和结束。

任何帮助都非常感谢。

我稍微修改了下面的查询以获得预期的结果:

WITH STEP1 AS (
SELECT   
[deviceid] AS deviceId,
System.Timestamp AS ts,
LAG(DATEADD(millisecond, epochtime, '1970-01-01T00:00:00Z') ) OVER (LIMIT DURATION(hour, 24)) as previousTime
FROM 
input TIMESTAMP BY DATEADD(millisecond, epochtime, '1970-01-01T00:00:00Z') 
)
SELECT * from STEP1

问题是"ts"是在当前步骤中定义的,但是在使用LAG时,您正在查看来自FROM语句的原始消息,并且它不包含"ts"变量。

如果您有任何问题,请告诉我。

谢谢

JS - Azure 流分析团队

最新更新