我正在设置与kafka Broks连接的Apache Flink。
我以随机顺序收到以下消息:
- 消息(timestamp = [..],index = 1,somedata = [..](
- 消息(timestamp = [..],index = 2,somedata = [..](
- 消息(timestamp = [..],index = 3,somedata = [..](
- 消息(timestamp = [..],index = 2,somedata = [..](
- 消息(timestamp = [..],index = 3,somedata = [..](
- 消息(timestamp = [..],index = 1,somedata = [..](
我来自Kafka的记录获得了index
字段。
在我的应用程序中,我需要计算具有相同ID的记录的最后两个,然后立即发送响应。
例如,这两个:
- 消息(timestamp = [..],index = 1,somedata = [..](
- 消息(timestamp = [..],index = 1,somedata = [..](
使用相同的索引字段存储和计算最后两个记录的最佳方法是什么?你能告诉我一些提示吗?
您的要求尚不完全清楚,但是您可能想了解的机制是使用keyBy(e -> e.index)
通过索引字段将流进行分组/分配流程,以及用于记住最后一个事件的键控状态(或两个(对于索引的每个值。
如果您需要考虑时间戳,即使在索引的单个值之内,事件流也是秩序的,那么您将需要先按时间戳将流进行排序。在这种情况下,如果您使用Flink SQL进行分类,您将有更轻松的时间,然后您可以使用Match_Reckize执行模式识别,尽管对于如此简单的模式来说,这可能是过分的。这是如何进行排序的一个示例。