如何按照给定的主题顺序消费kafka主题



目前,我面临着一个问题,我需要根据给定的指定顺序消费来自不同主题的数据。假设我们有3个主题,分别是主题1、主题2和主题3。首先,我需要确保我按照以下顺序使用主题。

topic-2 > topic-3 > topic-1

应用程序应该监听并读取topic-2中的所有消息,然后继续从topic-3和topic-1中消费。再次需要执行此操作,直到主题收到消息。

这在Kafka中可能吗?

我不确定您是否有任何特殊的约束,但您可以尝试在您的应用程序代码中这样做:

consumeAll(topic2); // when done, consume next topic
consumeAll(topic3); // when done, consume next topic
consumeAll(topic1); 

但是请注意:如果新消息被附加到"相同"的多个主题中。时间,您将无法在应用程序代码中重新创建插入顺序,因为Kafka只保证顺序在单个主题分区,不能跨多个分区或多个主题。

你可以使用时间戳,它必须嵌入在Kafka消息中。所以你可以知道哪个是先出现的:

{ messageId, payload, timestamp }

使用时间戳意味着,你所有的生产者必须使用同步时钟。否则你可能会得到一些毫秒的漂移,正确的顺序就会消失。

但是你会遇到下一个问题:在开始处理之前你需要等待多长时间?(例如,如果topic3没有收到新消息)

另一件要考虑的事情:您收到来自topic3的新消息。现在该怎么办呢?根据您的描述,您无法处理它,因为必须先有来自topic2的消息。您希望等待来自topic2的新消息多长时间?

虽然也许只听topic2更好。只有当您从topic2接收到消息时,才开始从topic3获取消息。当你从话题3中得到一些东西时,你就开始听话题1。然后从头再来。

像这样:

while(true) {
msg2 = consumeAllNewMessages(topic2); // blocking call, until message received
msg3 = consumeAllNewMessages(topic3); // blocking, too
msg1 = consumeAllNewMessages(topic1); // blocking, too
process(msg2, msg3, msg1);
}

(当然你可以(也应该)用一些非阻塞代码代替阻塞调用,例如使用CompletableFuture)

但又:这只能保证您使用这些主题的顺序。但它并没有告诉你这些消息(跨主题)以何种顺序被发送到Kafka。这需要嵌入带有同步时钟的时间戳。

最新更新