如何根据持续时间(例如1h)聚合来自Kafka主题的消息?



我们正在以每秒几百条的速率将消息流式传输到Kafka主题。每条消息都有时间戳和有效负载。最终,我们希望将一个小时的数据(基于消息的时间戳)聚合到拼花文件中,并将它们上传到廉价的远程存储(对象存储)。

一种简单的方法是让消费者简单地从主题中读取消息,并在内存中进行聚合/卷取,一旦有一个小时的数据,生成并上传parquet文件。

然而,如果消费者崩溃或需要重新启动,我们将丢失当前小时开始以来的所有数据-如果我们使用enable.auto.commit=trueenable.auto.commit=false并在一批消息后手动提交。

对于消费者来说,一个简单的解决方案可能是保持阅读,直到一个小时的数据在内存中,做parquet文件生成(并上传它),然后调用commitAsync()commitSync()(使用enable.auto.commit=false并使用外部存储来跟踪偏移量)。

但这将导致数百万条消息至少在一个小时内没有提交。我想知道卡夫卡是否允许"延迟"。这么多的消息/这么长时间的消息提交(我似乎记得在什么地方读到过这个,但我的生命我再也找不到它了)。

实际问题:

a)在Kafka可能认为消费者被破坏或停止向消费者提供额外消息之前,未提交的消息数量(或持续时间)是否有限制?这似乎是违反直觉的,因为enable.auto.commit=false和管理消费者中的偏移量的目的是什么(例如,在外部数据库的帮助下)。

b)在健壮性/冗余性和可扩展性方面,在消费者组中有多个消费者是很好的;如果我理解正确的话,每个分区永远不可能有多个Consumer。如果我们随后运行多个Consumer并为每个主题配置多个分区,我们就不能进行这种聚合/卷升,因为现在消息将分布在Consumer之间。解决此问题的唯一方法是为属于此类一小时组的所有消息提供额外的(外部)临时存储,对吗?

你可以用TimestampExtractor配置Kafka Streams,将数据聚合到不同类型的时间窗口

到parquet文件中,并将它们上传到廉价的远程存储(对象存储)。

Kafka Connect S3 sink,或Pinterest Secor工具,已经这样做了

最新更新