我正在寻求建议。我必须使用GlobalKtables来消费重量级话题,并与我的处理器进行连接。Joins涉及80Go-8个分区主题和 50Go单个分区当我的java处理器启动时,它会基于这两个主题创建两个GlobalKTables。在消费这个时,我做我的生意,一切都很好。 我用每个主题的非常非常少量的数据对它进行了测试,结果很好。这是GKT init:的狙击 但是,当部署在我的env上时,我的应用程序似乎已经"初始化"GlobalKTables 8个小时了。而且似乎没有处理任何新的事件。 问题: 我曾想过使用数据库而不是GlobalKTables来防止这种初始化时间,但它需要重新开发处理器的主要部分。 环境:Java 14Kafka 2.5.1myStreamBuilder.globalTable(myTopicName, Consumed.with(Serdes.String(), myDbziumSerdes))
使用GlobalKTable
意味着Kafka主题将完全复制到属于application.id
的每个KafkaStreams
,无论是在内存中还是在磁盘上(即RocksDB(,具体取决于您如何实现StateStore
。
GlobalKTable
用于使用可以不同于输入流的键的连接键来执行与KStream
的连接,而不必重新划分输入主题。
通常,GlobalKTable
用于具体化包含参考数据或随时间缓慢变化的数据的主题。
GlobalKTable
的一个缺点是KafkaStreams只使用一个全局线程来管理所有全局表。这就是为什么多个GlobalKTable
的初始化可能需要一段时间,几秒钟到几个小时。
要优化初始化,您可以覆盖/增加这些设置:
// for global consumer (default values).
streamsConfig.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1000);
streamsConfig.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.FETCH_MAX_BYTES_CONFIG), 52428800);
streamsConfig.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), 1048576);
streamsConfig.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.RECEIVE_BUFFER_CONFIG), 65536);
此外,应该压缩GlobalKTable
的输入主题(即cleanup.policy=compact
(。此外,您可以尝试减少主题的片段大小(例如,100MB(,以增加片段旋转的频率,从而进行更积极的压缩。