我有一个Kafka Streams应用程序(Kafka Streams 2.1 + Kafka broker 2.0),它基于TimeWindows进行聚合,我使用抑制运算符来抑制结果的输出。
在我重新启动应用程序之前一切正常,它会将 KTABLE-SUPPRESS-STATE-STORE 的偏移量重置为 0 以恢复抑制状态,如预期的那样。但是每次重启它,它都会抛出一个OutOfMemoryError
,我想可能是堆大小不够,所以我用了一个更大的Xmx/Xms
,它可以工作一两次重启,然后OutOfMemoryError
又回来了。现在Xmx
大约是20G,我觉得这里有些不对劲。
代码片段:
TimeWindows windows = TimeWindows.of(windowSize).until(retentionHours.toMillis()).grace(graceHours);
KTable<Windowed<String>, MyStatistics> kTable = groupedBySerialNumber
.windowedBy(windows)
.aggregate(MyStatistics::new,
(sn, resList, stats) -> stats.addResources(resList).updateSN(sn),
Materialized.with(Serdes.String(), ArchiveSerdes.resourceStatistics()))
.suppress(Suppressed.untilTimeLimit(timeToWait, Suppressed.BufferConfig.maxBytes(bufferMaxBytes)));
而且我发现 KTABLE-SUPPRESS-STATE-STORE 中的记录键类似于 1234567j P,这是不可读的,但我想它是通过组合 SN 和窗口生成的,我认为这将使 KTABLE-SUPPRESS-STATE-STORE 冗余,因为每个 SN 每个窗口都会有多个记录。
我有两个问题:
- 如果
OutOfMemoryError
指示堆大小是否较小,如果是,如何限制速率,如果不是,这意味着什么? - KTABLE-SUPPRESS-STATE-STORE的密钥由哪个API定义,我如何或应该控制它?
谢谢!
编辑于 2019/4/16
错误堆栈跟踪为:
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:88)
at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
如果 OutOfMemoryError 指示堆大小是否较小,如果是,如何限制速率,如果不是,这意味着什么?
是的,没有足够的堆来分配应用程序运行所需的所有内存。我们不经常看到这种情况,抑制运算符是新的,所以我对此表示怀疑,但最好记住,基本上应用程序中的任何数据结构都可能负责。
诊断内存压力的最佳方法是执行"堆转储"。这基本上是将JVM的整个内存复制到一个文件中,以便您可以使用 https://www.eclipse.org/mat/这样的程序分析其内容。这将是一个学习曲线,但我想你会发现一些分析内存使用情况的工具通常非常方便。
您可以随时触发堆转储(有几种方法可以做到这一点,您必须研究最适合您的方法)。但我认为你会想要利用 Java 的漂亮选项在出现内存不足错误时进行堆转储。这样,您更有可能肯定地确定罪魁祸首。请参阅 JVM 的 https://docs.oracle.com/javase/7/docs/webnotes/tsg/TSG-VM/html/clopts.html#gbzrr 或类似内容。
我可以推测堆转储的原因,但我担心我可能会让你误入歧途,浪费你的时间。获得转储结果后,我认为您应该继续在 Kafka 问题跟踪器中打开错误报告:https://issues.apache.org/jira/projects/KAFKA 。然后,我们可以帮助您弄清楚如何解决该错误以使您再次运行,以及如何在将来的版本中修复它。
实际上,我将提供一个猜测...您可能会看到此错误的结果:https://github.com/apache/kafka/pull/6536 (https://issues.apache.org/jira/browse/KAFKA-7895) 。如果在删除抑制运算符时 OOME 消失,则可能需要暂时将其排除在外。一旦我们合并了修复程序,我就会请求发布错误修复程序,您可以重试以查看问题是否已解决。
KTABLE-SUPPRESS-STATE-STORE的密钥由哪个API定义,我如何或应该控制它?
幸运的是,这有一个更直接的答案。您正在查看的键是记录键和窗口时间戳的二进制打包版本。此密钥是您使用windowBy
的结果。在 Java 中,您可以看到聚合的结果是KTable<Windowed<String>, ...>
,并且抑制不会更改键或值类型。换句话说,您正在查看密钥的序列化版本(Windowed<String>
)。
暂时搁置压制;假设您有两个序列号,"asdf"和"zxcv"。假设您的窗口大小为一小时。您的应用程序在一天中的每个小时(独立)对每个序列号的事件进行分组。因此,从 9:00 到 10:00 的所有"asdf"记录都有一个聚合,从 9:00 到 10:00 的所有"zxcv"记录也有一个聚合。因此,窗口 KTable 中的键总数为key space
xnumber of windows being retained
。
抑制运算符对 KTable 中的键数没有影响。其目的是在指定的时间量 (timeToWait
) 内禁止对这些密钥的更新。例如,如果不进行抑制,如果在 9:00 到 10:00 之间对"asdf"记录进行 3 次更新,则窗口聚合每次都会发出(asdf, 9:00)
个更新的结果,因此对于 3 个事件,您会看到 3 个结果更新出来。Suppress 运算符只是阻止这些结果更新,直到timeToWait
通过,当它通过时,它只发出最新的更新。
因此,任何时候抑制缓冲区中的键数都小于上游 KTable 中的键总数。它只包含在过去timeToWait
时间内更新的密钥。
这有帮助吗?