上下文:我们使用Kinesis分析来处理传感器数据,并在传感器数据中发现异常。
目标:我们需要确定过去X
分钟内没有发送数据的传感器。
Kinesis analytics SQL尝试了以下方法,但没有成功:
Stagger Window
技术适用于前3个测试用例,但不适用于测试用例4
CREATE OR REPLACE PUMP "STREAM_PUMP_ALERT_DOSCONNECTION" AS INSERT INTO "INTERMEDIATE_SQL_STREAM" SELECT STREAM "deviceID" as "device_key", count("deviceID") as "device_count", ROWTIME as "time" FROM "INTERMEDIATE_SQL_STREAM_FOR_ROOM"
WINDOWED BY STAGGER (
PARTITION BY "deviceID", ROWTIME RANGE INTERVAL '1' MINUTE);
下面提到的Left join
和group by
查询并不适用于所有测试用例
查询1:
CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS
INSERT INTO "INTERMEDIATE_SQL_STREAM_FOR_ROOM2"
SELECT STREAM
ROWTIME as "resultrowtime",
Input2."device_key" as "device_key",
FROM INTERMEDIATE_SQL_STREAM_FOR_ROOM
OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS Input1
LEFT JOIN INTERMEDIATE_SQL_STREAM_FOR_ROOM AS Input2
ON
Input1."device_key" = Input2."device_key"
AND Input1.ROWTIME <> Input2.ROWTIME;
查询2:
CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS
INSERT INTO "INTERMEDIATE_SQL_STREAM_FOR_ROOM2"
SELECT STREAM
ROWTIME as "resultrowtime",
Input2."device_key" as "device_key"
FROM INTERMEDIATE_SQL_STREAM_FOR_ROOM
OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS Input1
LEFT JOIN INTERMEDIATE_SQL_STREAM_FOR_ROOM AS Input2
ON
Input1."device_key" = Input2."device_key"
AND TSDIFF(Input1, Input2) > 0;
查询3:
CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS
INSERT INTO "INTERMEDIATE_SQL_STREAM_FOR_ROOM2"
SELECT STREAM
ROWTIME as "resultrowtime",
Input2."device_key" as "device_key"
FROM INTERMEDIATE_SQL_STREAM_FOR_ROOM
OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS Input1
LEFT JOIN INTERMEDIATE_SQL_STREAM_FOR_ROOM AS Input2
ON
Input1."device_key" = Input2."device_key"
AND Input1.ROWTIME = Input2.ROWTIME;
CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP2" AS
INSERT INTO "DIS_CONN_DEST_SQL_STREAM_ALERT"
SELECT STREAM "device_key", "count"
FROM (
SELECT STREAM
"device_key",
COUNT(*) as "count"
FROM INTERMEDIATE_SQL_STREAM_FOR_ROOM2
GROUP BY FLOOR(INTERMEDIATE_SQL_STREAM_FOR_ROOM2.ROWTIME TO MINUTE), "device_key"
)
WHERE "count" = 1;
以下是测试案例供您参考:
测试用例1:
- 设备1和设备2连续向Kinesis发送数据分析
- CCD_ 5分钟后设备2继续发送数据,但是设备1没有发送数据
测试用例1的输出:
我们希望Kinesis Analytics弹出设备1,这样我们就知道哪个设备没有发送数据。
测试用例2(间隔-10分钟(
- 设备1在09:00发送数据
- 设备2在09:02发送数据
- 设备2在09:11再次发送数据,但设备1没有发送任何数据
测试用例2的输出:
我们希望Kinesis Analytics弹出设备1,这样我们就知道哪个设备没有发送数据。
测试用例3(间隔-10分钟(
- 设备1和设备2连续向运动学分析发送数据
- 两台设备(1和2(在接下来的15分钟内都不会发送任何数据
测试用例3的输出:
我们希望Kinesis分析弹出设备1&设备2,以便我们知道哪些设备没有发送数据。
测试用例4:(间隔-10分钟(
- 设备1在09:00发送数据
- 设备2在09:02发送数据
- 设备1在09:04再次发送数据
- 设备2在09:06再次发送数据
- 然后没有数据
测试用例4的输出:
我们希望分析在09:14弹出设备1,在09:16弹出设备2。这样我们就可以在准确的间隔后获得断开连接的设备(即不发送数据的设备(。
注意:AWS支持将我们引导到无法回答问题的简单查询。看来只有当我们升级支持计划时,他们才能帮助我们进行准确的查询。
我不熟悉AWS扩展或修改Apache Flink的所有方法,但开源Flink并没有提供一种简单的方法来检测所有源是否停止发送数据。一种解决方案是使用类似于带有处理计时器的处理函数的东西来检测数据的缺失。
文档中有这样一个例子:https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function/#example