我的应用程序中运行了一个消费者API线程(运行在不同的节点上(,我的目标是消费来自Kafka的传入记录,该记录将具有唯一的id作为密钥。正在寻找一个选项来存储Kafka Consumer API处理的记录,以便我的订单可以使用它进行显示。
计划使用Stream API来处理和创建Kafka代理上的Ktable。如果我选择使用持久状态存储,它还会占用我的本地应用程序内存吗?若是这样的话,它是不是像临时的一样,在处理记录后立即释放内存?尝试避免应用程序中的任何内存问题,并查看哪个状态存储选项最适合我的用例。非常感谢!
如果我选择使用持久状态存储,它还会消耗我的本地应用程序内存吗?
它取决于底层存储,可以是:RocksDB、内存中的哈希图或其他数据结构。
如果是,它是像临时的,并在处理记录后立即释放内存吗?
无
顺便说一句:您不应该在任何Kafka代理上部署您的Kafka Streams应用程序。
在Kafka broker 上创建一个Ktable
客户端将填充KTable
。
如果我选择使用持久状态存储,它还会消耗我的本地应用程序内存吗?
即使RocksDB显然也会使用一些本地内存(请注意,内存是堆外的,因为RocksDB是一个通过JNI集成到JVM中的C++应用程序(。当然,您也可以根据需要对RocksDB进行微调/配置,并相应地限制其内存消耗。一般来说,RocksDB溢出到磁盘,因此您可以保持比主内存更大的状态。
它是临时的,并在处理记录后立即释放内存吗?
否。如果您将主题读取为变更日志,则每条记录都将是插入/更新/删除(通过tombstone删除,即value=null
(。假设每个记录都有唯一的ID,那么所有记录都将是插入的,KTable将无限增长。
你如何定义";处理过的";记录?如果可以确定记录何时被完全处理,那么可以使用KafkaProducer
将tombstone写入表输入主题。KafkaStreams最终会读取并处理这个tombstone,并从KTable中删除消息。当然,这将是一个异步删除,但它可能工作得很好。