从使用者启动的主题中获取最新值,然后正常继续



我们有一个 Kafka 制作器,它以非常高的频率为保留时间 = 10 小时的主题生成键控消息。这些消息是实时更新,使用的键是值已更改的元素的 ID。因此,该主题充当更改日志,并且将有许多重复的键。

现在,我们试图实现的是,当 Kafka 消费者启动时,无论最后一个已知状态(新消费者、崩溃、重启等)如何,它都会以某种方式构造一个表,其中包含主题中所有键的最新值,然后照常监听新的更新,保持 Kafka 服务器上的最小负载,让消费者完成大部分工作。我们尝试了很多方法,但没有一种似乎是最好的。

我们尝试了什么:

1 个更新日志主题 + 1 个紧凑主题:

  1. 生产者向事务中包装的两个主题发送相同的消息,以确保成功发送。
  2. 使用者启动并请求更改日志主题的最新偏移量。
  3. 从开始使用压缩主题来构造表。
  4. 自请求的偏移量以来继续使用更改日志。

缺点:

即使
  • 将日志压缩频率设置为尽可能高,压缩主题中存在重复项的可能性也非常高。
  • Kakfa 服务器上的主题数为 x2。

KSQL:

使用 KSQL,我们要么必须将 KTable 重写为主题,以便消费者可以看到它(额外主题),要么我们需要消费者使用 KSQL Rest Server 执行 KSQLSELECT并查询表(不如 Kafka API 那么快和高性能)。

Kafka Consumer API:

使用者从头开始并消费主题。这非常有效,但使用者必须使用 10 小时的更改日志来构造最后一个值表。

夫卡流:

通过使用 KTables,如下所示:

KTable<Integer, MarketData> tableFromTopic = streamsBuilder.table("topic_name", Consumed.with(Serdes.Integer(), customSerde));
KTable<Integer, MarketData> filteredTable = tableFromTopic.filter((key, value) -> keys.contains(value.getRiskFactorId()));

Kafka Streams 将在每个 KTable (名为{consumer_app_id}-{topic_name}-STATE-STORE-0000000000-changelog)在 Kafka 服务器上创建 1 个主题,这将产生大量的主题,因为我们有大量的消费者。

从我们尝试的内容来看,看起来我们需要增加服务器负载或消费者启动时间。难道没有一种"完美"的方式来实现我们正在努力做的事情吗?

提前谢谢。

通过使用

KTables,Kafka Streams 将在每个 KTable 的 Kafka 服务器上创建 1 个主题,这将产生大量的主题,因为我们有大量的消费者。

如果您只是将现有主题读入KTable(通过StreamsBuilder#table()),那么 Kafka Streams 不会创建额外的主题。KSQL 也是如此。

如果您能澄清您究竟想用 KTable 做什么,这将有所帮助。显然,您正在做一些确实会导致创建其他主题的事情?

1 个更新日志主题 + 1 个紧凑主题:

你为什么考虑有两个独立的主题?通常,应始终压缩更改日志主题。鉴于您的用例描述,我认为没有理由不应该是:

现在,我们试图实现的是,当 Kafka 消费者启动时,无论最后的已知状态(新消费者、崩溃、重启等)如何,它都会以某种方式构建一个表,其中包含主题中所有键的最新值,然后像往常一样继续侦听新的更新 [...]

因此,压缩对您的用例非常有用。它还可以防止您描述的此问题:

使用者从头开始并消费主题。这非常有效,但使用者必须使用 10 小时的更改日志来构造最后一个值表。

请注意,要重建最新的表值,Kafka Streams、KSQL 和 Kafka Consumer 的所有三个都必须完整地(从头到尾)读取表的基础主题。如果未压缩该主题,则可能确实需要很长时间,具体取决于数据量、主题保留设置等。

从我们尝试的内容来看,看起来我们需要增加服务器负载或消费者启动时间。难道没有一种"完美"的方式来实现我们正在努力做的事情吗?

在不了解您的用例的情况下,尤其是在填充 KTable 后您想用它们做什么,我的答案是:

  • 确保"更改日志主题"也被压缩。
  • 首先尝试 KSQL。如果这不能满足您的需求,请尝试 Kafka Streams。如果这不能满足您的需求,请尝试 Kafka 消费者。

例如,如果 Kafka Consumer 应该对"表"数据进行任何有状态处理,我不会使用它,因为 Kafka Consumer 缺乏容错有状态处理的内置功能。

消费者从头开始并使用主题。这奏效了 完美,但消费者必须使用 10 小时的更改日志 构造最后一个值表。

在应用程序首次启动时,您所说的是正确的。

若要避免在每次重新启动期间出现这种情况,请将键值数据存储在文件中。

例如,您可能希望使用持久映射(如 MapDB)。

由于您为使用者提供了group.id并且定期或在每条记录存储在映射中之后提交偏移量,因此下次应用程序重新启动时,它将从该group.id的最后一个提交偏移量中读取它。

因此,花费大量时间的问题仅在最初(第一次)发生。只要你有文件,你就不需要从头开始使用。

如果文件不存在或删除,只需seekToBeginningKafkaConsumer并重新构建即可。

在某个地方,您需要存储此键值以供检索,为什么它不能成为持久存储?


如果您出于某种原因想要使用 Kafka 流,那么另一种选择(不像上面那么简单)是使用持久支持的存储。

例如,持久全局存储。

streamsBuilder.addGlobalStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(topic), keySerde, valueSerde), topic, Consumed.with(keySerde, valueSerde), this::updateValue);

PS:在存储偏移量的目录中会有一个名为.checkpoint的文件。如果主题在中间被删除,你会得到OffsetOutOfRangeException.您可能希望避免这种情况,也许通过使用UncaughtExceptionHandler

有关详细信息,请参阅 https://stackoverflow.com/a/57301986/2534090。

最后

为此,最好将消费者与持久文件一起使用,而不是使用Streams,因为它提供了简单性。

相关内容

  • 没有找到相关文章

最新更新