Kafka流绑定内存RocksDBConfig



我试图了解Kafka Streams的内部是如何在缓存和RocksDB(状态存储(方面工作的。

KTable<Windowed<EligibilityKey>, String> kTable = kStreamMapValues
.groupByKey(Grouped.with(keySpecificAvroSerde, Serdes.String())).windowedBy(timeWindows)
.reduce((a, b) -> b, materialized.withLoggingDisabled().withRetention(Duration.ofSeconds(retention)))
.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(timeToWaitForMoreEvents),
Suppressed.BufferConfig.unbounded().withLoggingDisabled()));

在我的拓扑结构的上述部分中,我使用了一个具有300个分区的Kafka主题。该应用程序部署在OpenShift上,内存分配为4GB。我注意到应用程序的内存不断增加,直到最终出现OOMKILLED。经过一些研究,我读到自定义RocksDB配置是我应该实现的,因为默认大小对我的应用程序来说太大了。记录首先进入缓存(由cache_MAX_BYTES_BUFFERING_CONFIG和COMMIT_INTERVAL_MS_CONFIG配置(,然后进入状态存储。

public class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter {
private static org.rocksdb.Cache cache = new org.rocksdb.LRUCache(1 * 1024 * 1024L, -1, false, 0);
private static org.rocksdb.WriteBufferManager writeBufferManager = new org.rocksdb.WriteBufferManager(1 * 1024 * 1024L, cache);
@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
// These three options in combination will limit the memory used by RocksDB to the size passed to the block cache (TOTAL_OFF_HEAP_MEMORY)
tableConfig.setBlockCache(cache);
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setWriteBufferManager(writeBufferManager);
// These options are recommended to be set when bounding the total memory
tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
tableConfig.setPinTopLevelIndexAndFilter(true);
tableConfig.setBlockSize(2048L);
options.setMaxWriteBufferNumber(2);
options.setWriteBufferSize(1 * 1024 * 1024L);
options.setTableFormatConfig(tableConfig);
}
@Override
public void close(final String storeName, final Options options) {
// Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance.
}
}

对于每个时间窗口分段,默认情况下会创建三个分段。如果我使用300个分区,由于每个分区将创建3个时间窗口段,因此将创建900个RocksDB实例。我的理解是正确的吗?

Memory allocated in OpenShift / RocksDB instances => 4096MB / 900 => 4.55 MB

(WriteBufferSize * MaxWriteBufferNumber) + BlockCache + WriteBufferManager => (1MB * 2) + 1MB + 1MB => 4MB

BoundedMemoryRocksDBConfig.java是用于RocksDB的每个实例,还是用于所有实例?

如果您从一个有300个分区的主题中消费,并且使用分段状态存储,即在DSL中使用时间窗口,那么最终将获得900个RocksDB实例。如果只使用一个Kafka Streams客户端,即不扩展,那么所有900个RocksDB实例都将位于同一计算节点上。

BoundedMemoryRocksDBConfig限制RocksDB每个Kafka Streams客户端使用的内存。这意味着,如果只使用一个Kafka Streams客户端,BoundedMemoryRocksDBConfig会限制所有900个实例的内存。

我的理解正确吗?

在OpenShift/RocksDB实例中分配的内存=>4096MB/900=>455百万

(WriteBufferSize*MaxWriteBufferNumber(+BlockCache+WriteBufferManager=>(1MB*2(+1MB+1MB=>4MB

不,这是不对的。

如果将Cache传递给WriteBufferManager,则memtables所需的大小也会根据缓存进行计数(请参阅BoundedMemoryRocksDBConfig和RocksDB文档中的脚注1(。因此,传递给缓存的大小是内存表和块缓存的限制。由于将缓存和写缓冲区管理器传递给同一计算节点上的所有实例,因此所有900个实例都受传递给缓存的大小的限制。例如,如果指定大小为4 GB,则所有900个实例(假设一个Kafka Streams客户端(使用的总内存将限制为4 GB。

请注意,传递到缓存的大小并不是一个严格的限制。尽管缓存构造函数中的布尔参数为您提供了强制执行严格限制的选项,但如果由于Kafka Streams使用的RocksDB版本中的错误,写入缓冲区内存也计入缓存,则强制执行将不起作用。

使用Kafka Streams 2.7.0,您将可以通过JMX公开的指标来监控RocksDB内存消耗。查看KIP-607了解更多详细信息。

最新更新