我正在做一些Kafka流集合,并将汇总记录写入主题并获取以下错误。我正在使用自定义JSON SERDE进行聚合助手类。我在某些博客上发现的解决方案是为了增加Max.Request.Size。
尽管我将最大尺寸从默认值增加到401391899,但序列化的聚合消息大小在随后的主题上的写入中不断增加。
在10分钟后运行流,以下错误显示。不确定我的SERDE是否存在问题,还是我应该更改除Max.Request.size以外的任何配置设置以克服此问题。
写信给主题的消息;
{A=5, B=1, C=0, D=87, E=1, F=0.4482758620689655 }
{A=6, B=1, C=0, D=87, E=1, F=0.4482758620689655 }
{A=7, B=1, C=2, D=87, E=1, F=0.4482758620689655 }
org.apache.kafka.common.errors.recordtoolargeexception:序列化时该消息为2292506字节,该序列化大于使用Max.Request配置的最大请求大小。
Exception in thread "StreamThread-1" java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Unknown Source)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
at java.lang.AbstractStringBuilder.append(Unknown Source)
at java.lang.StringBuilder.append(Unknown Source)
at com.google.gson.stream.JsonReader.nextString(JsonReader.java:1043)
at com.google.gson.stream.JsonReader.nextValue(JsonReader.java:784)
at com.google.gson.stream.JsonReader.nextInArray(JsonReader.java:693)
at com.google.gson.stream.JsonReader.peek(JsonReader.java:376)
at com.google.gson.stream.JsonReader.hasNext(JsonReader.java:349)
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:80)
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:60)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:93)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:172)
at com.google.gson.Gson.fromJson(Gson.java:795)
at com.google.gson.Gson.fromJson(Gson.java:761)
at com.google.gson.Gson.fromJson(Gson.java:710)
at com.google.gson.Gson.fromJson(Gson.java:682)
at com.data.agg.streams.JsonDeserializer.deserialize(JsonDeserializer.java:34)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:156)
at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103)
at org.apache.kafka.streams.state.internals.CachingWindowStore.access$200(CachingWindowStore.java:34)
at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:86)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:131)
at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:222)
at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:205)
at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:120)
at org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:149)
at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:112)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
[Thread-2] INFO org.apache.kafka.streams.KafkaStreams - Stopped Kafka Stream process
这是黑暗中的镜头,因为您没有共享代码。
由于状态得到了kafka主题的支持,因此流式传输将记录写入该主题(每个键的记录,其值是属于密钥的状态(。随着时间的流逝,您声明(每键(会随着时间的流逝而增长,随着时间的流逝,"状态记录"会增长,并最终超过最大尺寸。