Kinesis Analytics会话或无聚合的交错窗口批处理



我希望使用Kinesis Data Analytics(或其他一些AWS托管服务(根据筛选条件批量处理记录。我们的想法是,当记录进入时,我们会启动一个会话窗口,并在15分钟内批处理任何匹配的记录。

交错窗口正是我们想要的,只是我们不想聚合数据,而只是一起返回记录。

理想情况下。。。

100 records spread over 15 min. (20 matching criteria) with first one at 10:02
|
v
At 10:17, the 20 matching records would be sent to the destination

我试过做一些类似的事情:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
"device_id" INTEGER, 
"child_id" INTEGER, 
"domain" VARCHAR(32),
"category_id" INTEGER,
"posted_at" DOUBLE,
"block" TIMESTAMP
);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Select all columns from source stream
SELECT STREAM 
"device_id", 
"child_id", 
"domain", 
"category_id", 
"posted_at",
FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE) as block
FROM "SOURCE_SQL_STREAM_001"
WHERE "category_id" = 888815186
WINDOWED BY STAGGER (
PARTITION BY "child_id", FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE) 
RANGE INTERVAL '15' MINUTE);

我继续收到不在聚合中的所有列的错误:

From line 6, column 5 to line 6, column 12: Expression 'domain' is not being used in PARTITION BY sub clause of WINDOWED BY clause

Kinesis Firehose是一个建议的解决方案,但它对所有child_id都是一个盲窗口,因此它可能会将一个会话分割为多个会话,这正是我试图避免的。

有什么建议吗?感觉这可能不是合适的工具。

在select子句中尝试LAST_VALUE("domain") as domain

最新更新