org.apache.kafka.common.errors.RecordTooLargeException



我正在做一些Kafka流集合,并将汇总记录写入主题并获取以下错误。我正在使用自定义JSON SERDE进行聚合助手类。我在某些博客上发现的解决方案是为了增加Max.Request.Size。

尽管我将最大尺寸从默认值增加到401391899,但序列化的聚合消息大小在随后的主题上的写入中不断增加。

在10分钟后运行流,以下错误显示。不确定我的SERDE是否存在问题,还是我应该更改除Max.Request.size以外的任何配置设置以克服此问题。

写信给主题的消息;

{A=5, B=1, C=0, D=87, E=1, F=0.4482758620689655 }   
{A=6, B=1, C=0, D=87, E=1, F=0.4482758620689655 }  
{A=7, B=1, C=2, D=87, E=1, F=0.4482758620689655 }

org.apache.kafka.common.errors.recordtoolargeexception:序列化时该消息为2292506字节,该序列化大于使用Max.Request配置的最大请求大小。

Exception in thread "StreamThread-1" java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Unknown Source)
    at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
    at java.lang.AbstractStringBuilder.append(Unknown Source)
    at java.lang.StringBuilder.append(Unknown Source)
    at com.google.gson.stream.JsonReader.nextString(JsonReader.java:1043)
    at com.google.gson.stream.JsonReader.nextValue(JsonReader.java:784)
    at com.google.gson.stream.JsonReader.nextInArray(JsonReader.java:693)
    at com.google.gson.stream.JsonReader.peek(JsonReader.java:376)
    at com.google.gson.stream.JsonReader.hasNext(JsonReader.java:349)
    at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:80)
    at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:60)
    at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:93)
    at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:172)
    at com.google.gson.Gson.fromJson(Gson.java:795)
    at com.google.gson.Gson.fromJson(Gson.java:761)
    at com.google.gson.Gson.fromJson(Gson.java:710)
    at com.google.gson.Gson.fromJson(Gson.java:682)
    at com.data.agg.streams.JsonDeserializer.deserialize(JsonDeserializer.java:34)
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:156)
    at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103)
    at org.apache.kafka.streams.state.internals.CachingWindowStore.access$200(CachingWindowStore.java:34)
    at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:86)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:131)
    at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:222)
    at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:205)
    at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:120)
    at org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:149)
    at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:112)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
[Thread-2] INFO org.apache.kafka.streams.KafkaStreams - Stopped Kafka Stream process

这是黑暗中的镜头,因为您没有共享代码。

由于状态得到了kafka主题的支持,因此流式传输将记录写入该主题(每个键的记录,其值是属于密钥的状态(。随着时间的流逝,您声明(每键(会随着时间的流逝而增长,随着时间的流逝,"状态记录"会增长,并最终超过最大尺寸。

相关内容

  • 没有找到相关文章

最新更新