如何使用Flink在30分钟内找到销售额超过1000的商品



假设数据有三个字段:itemID、timeStamp、totalQuantityOfSale。

每5分钟,我想报告过去30分钟内销售额超过1000的商品。

我想做的是

DataStream<String> process =  stream
.keyBy(Item::getId)
.timeWindow(Time.minutes(30), Time.minutes(5))
.process(new MyProcessFunction());

在这个MyProcessFunction((中,我正在考虑存储最后一个数据的timeStamp和totalQuantityOfSale。所以

if(currentData.getTimeStamp()>lastTimeStamp){
sum+=(currentData.getQuantityOfSale-lastTotalQuantityOfSale);
}

但是,这个MyProcessFunction应该扩展ProcessWindowFunction,它需要大量内存。我也不知道这种方法是正确的还是不正确的。有人能告诉我怎么做吗?还有其他更好的解决方案吗?非常感谢!

您想要的内容似乎可以在Flink SQL中表示为

SELECT
id, window_start, window_end, sum(quantityOfSale) AS totalQuantityOfSale
FROM TABLE(
HOP(TABLE events, DESCRIPTOR(timeStamp), INTERVAL '5' MINUTES, INTERVAL '30' MINUTES))
GROUP BY
id, window_start, window_end
HAVING sum(quantityOfSale) > 1000;

相关内容

  • 没有找到相关文章

最新更新