Kafka Producer错误TOPIC:XXXXXX:自批创建以来已超过6686毫秒的10条记录过期加上延迟时间



Kafka版本:0.10.2.1,

Kafka Producer error Expiring 10 record(s) for TOPIC:XXXXXX: 6686 ms has passed since batch creation plus linger time
org.apache.kafka.common.errors.TimeoutException: Expiring 10 record(s) for TOPIC:XXXXXX: 6686 ms has passed since batch creation plus linger time

之所以发生此异常,是因为对记录进行排队的速度比发送记录的速度快得多。

当您调用send方法时,ProducerRecord将存储在内部缓冲区中,以便发送到broker。缓冲ProducerRecord后,无论是否发送,该方法都会立即返回。

记录被分组成批发送到代理,以减少每条消息的传输开销并提高吞吐量。

将记录添加到批中后,发送该批是有时间限制的,以确保在指定的持续时间内发送。这由Producer配置参数request.timeout.ms控制,默认值为30秒。参见相关答案

如果批处理的排队时间超过了超时限制,则会引发异常。该批中的记录将从发送队列中删除。

生产者配置块.on.buffer.full、metadata.fetch.timeout.ms和timeout.ms已被删除。它们最初在Kafka 0.9.0.0中被弃用。

因此,尝试增加请求超时.ms

尽管如此,如果您有任何与吞吐量相关的问题,您也可以参考以下博客

当代理/主题/分区无法与生产商联系或生产商在队列前超时时,就会出现此问题。

我发现,即使是一个现场经纪人,你也会遇到这个问题。在我的案例中,主题分区领导者指向了不活动的代理ID。要解决这个问题,您必须将这些领导者迁移到活动的代理。

对受影响的主题使用主题重新分配工具。主题迁移:https://kafka.apache.org/21/documentation.html#basic_ops_automigrate

我收到了同样的消息,我修复了它,从zookeeper中清除了kafka数据。之后它就开始工作了。

我在aks集群中也遇到过同样的问题,只是重新启动kafka和zookeeper服务器就解决了这个问题。

对于KAFKA DOCKER案例

花了很多时间了解发生了什么,包括更改server.propertiesproducer.properties和我的代码(Eclipse)。这对我不起作用(我从笔记本电脑向Linux服务器上的Kafka Docker发送消息)

我清理了卡夫卡和动物园管理员,并通过docker compose.yml(我是新手)重新安装了它们。请查看我的docker-compose.yml文件,并按照我如何将这些IP更改为Linux服务器的IP

bitnami/kafka

bitnami/kafka

到。。。

bitnami改变

而10.5.1.30是我的Linux服务器的IP地址

wurstmeister kafka

wurstmeister

之后,我运行了我的代码,结果是:

结果

完整代码:

import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class SimpleProducer {
public static void main(String[] args) throws Exception {
try {
String topicName = "demo";
Properties props = new Properties();
props.put("bootstrap.servers", "10.5.1.30:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
Future<RecordMetadata> f = producer.send(new ProducerRecord<String, String>(topicName, "Eclipse3"));
System.out.println("Message sent successfully, total of message is: " + f.get().toString());
producer.close();
} catch (Exception e) {
System.out.println(e.getMessage());
}
System.out.println("Successful");
}
}

希望能有所帮助。和平

假设一个主题有100个分区(0-99)。Kafka允许您通过指定特定分区来生成主题的记录。面临的问题是,我试图生成分区>99,因为代理拒绝这些记录。

我们什么都试过了,但没有成功。

  1. 减少了生产商的批量大小,增加了请求时间。timeout.ms
  2. 重新启动目标卡夫卡集群,仍然没有运气
  3. 已检查目标kafka群集上的复制,该群集也运行良好
  4. 在prodcuer属性中添加了retries.backout.ms
  5. 在kafka prodcuer属性中添加了linger.time

最后,我们的案例中出现了kafka集群本身的问题,我们无法从2台服务器中获取介于两者之间的元数据。

当我们将目标kafka集群更改为我们的dev-box时,它运行良好。

最新更新