如何修复 JAVA Kafka 生产者错误 "Received invalid metadata error in produce request on partition" 和 内存不足 当代理关闭



我一直在使用Java创建一个Kafka Producer示例。我一直向Kafka发送仅为"Test"+Integer作为值的普通数据。我已经使用了以下属性,并且在我启动Producer之后客户端和消息正在路上,在此期间我正在杀死经纪人并且突然接收到以下错误消息而不是重试。

使用3个代理和3个分区的主题,复制因子为3并且没有最小内同步复制

以下是配置的属性config.put(ProducerConfig.ACKS_config,"all")
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"1");config.put(CommonClientConfigs.RETRIES_config,60);config.put(ProducerConfig.ENABLE_IDEMPOTENCE_config,true);config.put(ProducerConfig.RETRY_BACKGOFF_MS_config,10000);config.put(ProducerConfig.REQUEST_TIMEOUT_MS_config,30000);config.put(ProducerConfig.MAX_BLOCK_MS_config,10000);config.put(ProducerConfig.MAX_REQUEstrongIZE_config,1048576);config.put(ProducerConfig.BATCH_SIZE_config,16384);config.put(ProducerConfig.LINGER_MS_config,0);config.put(ProducerConfig.BUFFER_MEMORY_config,1073741824);//1GB

以及当我杀死所有经纪人或有时杀死其中一个经纪人如下

**Error:**
WARN org.apache.kafka.clients.producer.internals.Sender  - [Producer 
clientId=producer-1] Got error produce response with correlation id 124 
on topic-partition testing001-0, retrying (59 attempts left). Error: 
NETWORK_EXCEPTION
27791 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender  - [Producer 
clientId=producer-1] Received invalid metadata error in produce request 
on partition testing001-0 due to 
org.apache.kafka.common.errors.NetworkException: The server disconnected 
before a response was received.. Going to request metadata update now
28748 [kafka-producer-network-thread | producer-1] ERROR 
org.apache.kafka.common.utils.KafkaThread  - Uncaught exception in thread 
'kafka-producer-network-thread | producer-1':
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(Unknown Source)
at java.nio.ByteBuffer.allocate(Unknown Source)
at    org.apache.kafka.common.memory.MemoryPool$1.tryAllocate 
(MemoryPool.java:30)
at org.apache.kafka.common.network.NetworkReceive.readFrom
(NetworkReceive.java:112)
at org.apache.kafka.common.network.KafkaChannel.receive
(KafkaChannel.java:335)
at org.apache.kafka.common.network.KafkaChannel.read
(KafkaChannel.java:296)
at org.apache.kafka.common.network.Selector.attemptRead
(Selector.java:560)
at org.apache.kafka.common.network.Selector.pollSelectionKeys
(Selector.java:496)
at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
at org.apache.kafka.clients.producer.internals.Sender.run
(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run
(Sender.java:163)
at java.lang.Thread.run(Unknown Source)

我假设您正在测试生产者。当生产者连接到Kafka集群时,您将以逗号分隔的字符串形式传递所有代理IP和端口。在你的情况下,有三个经纪人。当生产者尝试连接到集群时,作为初始化的一部分,集群控制器用集群元数据进行响应。假设您的生产者只将消息填充到单个主题。集群在每一个主题的代理中都保持着领先地位。在确定了主题的领导者之后,你的制作人只会与领导者沟通,直到它上线。

在您的测试场景中,您故意杀死代理实例。当它发生时,kafka集群需要为您的主题识别一个新的领导者,控制器必须将新的元数据传递给您的生产者。如果元数据频繁更改(在您的情况下,您可能会同时杀死另一个代理),生产者可能会收到无效的元数据。

最新更新