我正在流式传输包含 posix/epoch 时间字段的事件消息。我正在尝试计算我在哪个时间范围内从设备收到了一系列消息。
让我们假设以下(简化的(输入:
[
{ "deviceid":"device01", "epochtime":1500975613660 },
{ "deviceid":"device01", "epochtime":1500975640194 },
{ "deviceid":"device01", "epochtime":1500975649627 },
{ "deviceid":"device01", "epochtime":1500994473225 },
{ "deviceid":"device01", "epochtime":1500994486725 }
]
我的计算结果应该是每个设备 ID 的 {deviceid, start, end} 之类的消息。我假设一个新的时间范围开始,如果两个事件之间的时间间隔超过一小时。在我的示例中,这将导致两次传输:
[
{"deviceid":"device01", "start":1500975613660, "end"=1500975649627},
{"deviceid":"device01", "start":500994473225, "end"=1500994486725}
]
我可以根据文档 https://msdn.microsoft.com/en-us/library/azure/mt573293.aspx 中的示例 2 转换纪元时间。但是,我不能在子查询中将转换后的时间戳与 LAG 函数一起使用。上一个时间的所有值在输出中都是空的。
WITH step1 AS (
SELECT
[deviceid] AS deviceId,
System.Timestamp AS ts,
LAG([ts]) OVER (LIMIT DURATION(hour, 24)) as previousTime
FROM
input TIMESTAMP BY DATEADD(millisecond, epochtime, '1970-01-01T00:00:00Z')
)
我不确定如何执行计算以及最佳方法是什么。我需要弄清楚事件系列的开始和结束。
任何帮助都非常感谢。
我稍微修改了下面的查询以获得预期的结果:
WITH STEP1 AS (
SELECT
[deviceid] AS deviceId,
System.Timestamp AS ts,
LAG(DATEADD(millisecond, epochtime, '1970-01-01T00:00:00Z') ) OVER (LIMIT DURATION(hour, 24)) as previousTime
FROM
input TIMESTAMP BY DATEADD(millisecond, epochtime, '1970-01-01T00:00:00Z')
)
SELECT * from STEP1
问题是"ts"是在当前步骤中定义的,但是在使用LAG时,您正在查看来自FROM语句的原始消息,并且它不包含"ts"变量。
如果您有任何问题,请告诉我。
谢谢
JS - Azure 流分析团队