Kafka版本:0.10.2.1,
Kafka Producer error Expiring 10 record(s) for TOPIC:XXXXXX: 6686 ms has passed since batch creation plus linger time
org.apache.kafka.common.errors.TimeoutException: Expiring 10 record(s) for TOPIC:XXXXXX: 6686 ms has passed since batch creation plus linger time
之所以发生此异常,是因为对记录进行排队的速度比发送记录的速度快得多。
当您调用send方法时,ProducerRecord将存储在内部缓冲区中,以便发送到broker。缓冲ProducerRecord后,无论是否发送,该方法都会立即返回。
记录被分组成批发送到代理,以减少每条消息的传输开销并提高吞吐量。
将记录添加到批中后,发送该批是有时间限制的,以确保在指定的持续时间内发送。这由Producer配置参数request.timeout.ms控制,默认值为30秒。参见相关答案
如果批处理的排队时间超过了超时限制,则会引发异常。该批中的记录将从发送队列中删除。
生产者配置块.on.buffer.full、metadata.fetch.timeout.ms和timeout.ms已被删除。它们最初在Kafka 0.9.0.0中被弃用。
因此,尝试增加请求超时.ms
尽管如此,如果您有任何与吞吐量相关的问题,您也可以参考以下博客
当代理/主题/分区无法与生产商联系或生产商在队列前超时时,就会出现此问题。
我发现,即使是一个现场经纪人,你也会遇到这个问题。在我的案例中,主题分区领导者指向了不活动的代理ID。要解决这个问题,您必须将这些领导者迁移到活动的代理。
对受影响的主题使用主题重新分配工具。主题迁移:https://kafka.apache.org/21/documentation.html#basic_ops_automigrate
我收到了同样的消息,我修复了它,从zookeeper中清除了kafka数据。之后它就开始工作了。
我在aks集群中也遇到过同样的问题,只是重新启动kafka和zookeeper服务器就解决了这个问题。
对于KAFKA DOCKER案例
花了很多时间了解发生了什么,包括更改server.properties
、producer.properties
和我的代码(Eclipse)。这对我不起作用(我从笔记本电脑向Linux服务器上的Kafka Docker发送消息)
我清理了卡夫卡和动物园管理员,并通过docker compose.yml(我是新手)重新安装了它们。请查看我的docker-compose.yml
文件,并按照我如何将这些IP更改为Linux服务器的IP
bitnami/kafka
bitnami/kafka
到。。。
bitnami改变
而10.5.1.30是我的Linux服务器的IP地址
wurstmeister kafka
wurstmeister
之后,我运行了我的代码,结果是:
结果
完整代码:
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class SimpleProducer {
public static void main(String[] args) throws Exception {
try {
String topicName = "demo";
Properties props = new Properties();
props.put("bootstrap.servers", "10.5.1.30:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
Future<RecordMetadata> f = producer.send(new ProducerRecord<String, String>(topicName, "Eclipse3"));
System.out.println("Message sent successfully, total of message is: " + f.get().toString());
producer.close();
} catch (Exception e) {
System.out.println(e.getMessage());
}
System.out.println("Successful");
}
}
希望能有所帮助。和平
假设一个主题有100个分区(0-99)。Kafka允许您通过指定特定分区来生成主题的记录。面临的问题是,我试图生成分区>99,因为代理拒绝这些记录。
我们什么都试过了,但没有成功。
- 减少了生产商的批量大小,增加了请求时间。timeout.ms
- 重新启动目标卡夫卡集群,仍然没有运气
- 已检查目标kafka群集上的复制,该群集也运行良好
- 在prodcuer属性中添加了retries.backout.ms
- 在kafka prodcuer属性中添加了linger.time
最后,我们的案例中出现了kafka集群本身的问题,我们无法从2台服务器中获取介于两者之间的元数据。
当我们将目标kafka集群更改为我们的dev-box时,它运行良好。