GlobalK表初始化持续时间



我正在寻求建议。我必须使用GlobalKtables来消费重量级话题,并与我的处理器进行连接。Joins涉及80Go-8个分区主题和

50Go单个分区当我的java处理器启动时,它会基于这两个主题创建两个GlobalKTables。在消费这个时,我做我的生意,一切都很好。

我用每个主题的非常非常少量的数据对它进行了测试,结果很好。这是GKT init:的狙击

myStreamBuilder.globalTable(myTopicName, Consumed.with(Serdes.String(), myDbziumSerdes))

但是,当部署在我的env上时,我的应用程序似乎已经"初始化"GlobalKTables 8个小时了。而且似乎没有处理任何新的事件。

问题:

  • 我可以使用GlobalKTables吗
  • 装载处理器对内存占用的影响是什么这些GlobalKTables
  • 还有其他更好的方法吗

我曾想过使用数据库而不是GlobalKTables来防止这种初始化时间,但它需要重新开发处理器的主要部分。

环境:Java 14Kafka 2.5.1

使用GlobalKTable意味着Kafka主题将完全复制到属于application.id的每个KafkaStreams,无论是在内存中还是在磁盘上(即RocksDB(,具体取决于您如何实现StateStore

GlobalKTable用于使用可以不同于输入流的键的连接键来执行与KStream的连接,而不必重新划分输入主题。

通常,GlobalKTable用于具体化包含参考数据或随时间缓慢变化的数据的主题。

GlobalKTable的一个缺点是KafkaStreams只使用一个全局线程来管理所有全局表。这就是为什么多个GlobalKTable的初始化可能需要一段时间,几秒钟到几个小时。

要优化初始化,您可以覆盖/增加这些设置:

// for global consumer (default values).
streamsConfig.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1000);
streamsConfig.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.FETCH_MAX_BYTES_CONFIG), 52428800);
streamsConfig.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), 1048576);
streamsConfig.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.RECEIVE_BUFFER_CONFIG), 65536);

此外,应该压缩GlobalKTable的输入主题(即cleanup.policy=compact(。此外,您可以尝试减少主题的片段大小(例如,100MB(,以增加片段旋转的频率,从而进行更积极的压缩。

相关内容

  • 没有找到相关文章

最新更新