我们有一个 Kafka 制作器,它以非常高的频率为保留时间 = 10 小时的主题生成键控消息。这些消息是实时更新,使用的键是值已更改的元素的 ID。因此,该主题充当更改日志,并且将有许多重复的键。
现在,我们试图实现的是,当 Kafka 消费者启动时,无论最后一个已知状态(新消费者、崩溃、重启等)如何,它都会以某种方式构造一个表,其中包含主题中所有键的最新值,然后照常监听新的更新,保持 Kafka 服务器上的最小负载,让消费者完成大部分工作。我们尝试了很多方法,但没有一种似乎是最好的。
我们尝试了什么:
1 个更新日志主题 + 1 个紧凑主题:
- 生产者向事务中包装的两个主题发送相同的消息,以确保成功发送。
- 使用者启动并请求更改日志主题的最新偏移量。
- 从开始使用压缩主题来构造表。
- 自请求的偏移量以来继续使用更改日志。
缺点:
即使- 将日志压缩频率设置为尽可能高,压缩主题中存在重复项的可能性也非常高。
- Kakfa 服务器上的主题数为 x2。
KSQL:
使用 KSQL,我们要么必须将 KTable 重写为主题,以便消费者可以看到它(额外主题),要么我们需要消费者使用 KSQL Rest Server 执行 KSQLSELECT
并查询表(不如 Kafka API 那么快和高性能)。
Kafka Consumer API:
使用者从头开始并消费主题。这非常有效,但使用者必须使用 10 小时的更改日志来构造最后一个值表。
卡夫卡流:
通过使用 KTables,如下所示:
KTable<Integer, MarketData> tableFromTopic = streamsBuilder.table("topic_name", Consumed.with(Serdes.Integer(), customSerde));
KTable<Integer, MarketData> filteredTable = tableFromTopic.filter((key, value) -> keys.contains(value.getRiskFactorId()));
Kafka Streams 将在每个 KTable (名为{consumer_app_id}-{topic_name}-STATE-STORE-0000000000-changelog
)在 Kafka 服务器上创建 1 个主题,这将产生大量的主题,因为我们有大量的消费者。
从我们尝试的内容来看,看起来我们需要增加服务器负载或消费者启动时间。难道没有一种"完美"的方式来实现我们正在努力做的事情吗?
提前谢谢。
KTables,Kafka Streams 将在每个 KTable 的 Kafka 服务器上创建 1 个主题,这将产生大量的主题,因为我们有大量的消费者。
如果您只是将现有主题读入KTable
(通过StreamsBuilder#table()
),那么 Kafka Streams 不会创建额外的主题。KSQL 也是如此。
如果您能澄清您究竟想用 KTable 做什么,这将有所帮助。显然,您正在做一些确实会导致创建其他主题的事情?
1 个更新日志主题 + 1 个紧凑主题:
你为什么考虑有两个独立的主题?通常,应始终压缩更改日志主题。鉴于您的用例描述,我认为没有理由不应该是:
现在,我们试图实现的是,当 Kafka 消费者启动时,无论最后的已知状态(新消费者、崩溃、重启等)如何,它都会以某种方式构建一个表,其中包含主题中所有键的最新值,然后像往常一样继续侦听新的更新 [...]
因此,压缩对您的用例非常有用。它还可以防止您描述的此问题:
使用者从头开始并消费主题。这非常有效,但使用者必须使用 10 小时的更改日志来构造最后一个值表。
请注意,要重建最新的表值,Kafka Streams、KSQL 和 Kafka Consumer 的所有三个都必须完整地(从头到尾)读取表的基础主题。如果未压缩该主题,则可能确实需要很长时间,具体取决于数据量、主题保留设置等。
从我们尝试的内容来看,看起来我们需要增加服务器负载或消费者启动时间。难道没有一种"完美"的方式来实现我们正在努力做的事情吗?
在不了解您的用例的情况下,尤其是在填充 KTable 后您想用它们做什么,我的答案是:
- 确保"更改日志主题"也被压缩。
- 首先尝试 KSQL。如果这不能满足您的需求,请尝试 Kafka Streams。如果这不能满足您的需求,请尝试 Kafka 消费者。
例如,如果 Kafka Consumer 应该对"表"数据进行任何有状态处理,我不会使用它,因为 Kafka Consumer 缺乏容错有状态处理的内置功能。
消费者从头开始并使用主题。这奏效了 完美,但消费者必须使用 10 小时的更改日志 构造最后一个值表。
在应用程序首次启动时,您所说的是正确的。
若要避免在每次重新启动期间出现这种情况,请将键值数据存储在文件中。
例如,您可能希望使用持久映射(如 MapDB)。
由于您为使用者提供了group.id
并且定期或在每条记录存储在映射中之后提交偏移量,因此下次应用程序重新启动时,它将从该group.id
的最后一个提交偏移量中读取它。
因此,花费大量时间的问题仅在最初(第一次)发生。只要你有文件,你就不需要从头开始使用。
如果文件不存在或删除,只需seekToBeginning
KafkaConsumer
并重新构建即可。
在某个地方,您需要存储此键值以供检索,为什么它不能成为持久存储?
如果您出于某种原因想要使用 Kafka 流,那么另一种选择(不像上面那么简单)是使用持久支持的存储。
例如,持久全局存储。
streamsBuilder.addGlobalStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(topic), keySerde, valueSerde), topic, Consumed.with(keySerde, valueSerde), this::updateValue);
PS:在存储偏移量的目录中会有一个名为.checkpoint
的文件。如果主题在中间被删除,你会得到OffsetOutOfRangeException
.您可能希望避免这种情况,也许通过使用UncaughtExceptionHandler
有关详细信息,请参阅 https://stackoverflow.com/a/57301986/2534090。
最后
为此,最好将消费者与持久文件一起使用,而不是使用Streams,因为它提供了简单性。