美好的一天,
我想找出Kafka队列是否可以容纳数据几秒钟,而不是发布数据。
我收到来自kafka主题的消息,解析数据后,我将其保存在内存中一段时间(10秒)(这是随着唯一消息的到来而构建的),每个消息都有其自己的计时器),我希望Kafka告诉我该消息已经过期(10秒),以便我可以继续执行其他任务。
但是,由于Flink/Kafka是事件驱动的,所以我希望Kafka有某种圆形时正时轮,可以在10秒后向消费者重现钥匙。
关于我如何使用Flink Window或Kafka功能对此进行构造的任何想法?
问候
关于您的初始问题:
我想找出Kafka队列是否可以存放数据几秒钟,而不是发布数据
您可以将log.cleanup.policy
设置为delete
(这是默认值),然后将retention.ms
从默认的604800000
(1周)更改为10000
。
您可以再次解释您要检查什么,Regards
部分之后是什么意思?
您可以靠近Kafka Streams库。https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html,https://kafka.apache.org/21/documentation/documentation/documentation/streams/streams/streams/developer-guide-guide-guide/processor-api-api-api-api-api。html。
使用Kafka流,您可以完成很多复杂的事件处理工作。处理器API是较低级别的API,并为您提供更高的灵活性,将每个处理消息放在State Store(Kafka Streams Axpraction,将复制到ChangElog主题)中,然后使用Punctuator
,您可以检查Message 是否已过期。