容器化 kafka - 由于"Timeout expired after 60000 milliseconds while awaiting InitProducerId"无法创建事务



我正在测试一个容器化的kafka实例,我用下面的命令创建了它:

docker run -d --name kafkacontainer -p 9093:9093 
-e KAFKA_BROKER_ID=1
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 
-e KAFKA_ZOOKEEPER_CONNECT=172.17.0.2:2181 
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://mytestvm:9093,BROKER://localhost:9092 
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092 
-e KAFKA_INTER_BROKER_LISTENER_NAME=BROKER 
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT 
-e KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1 
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 
confluentinc/cp-kafka:7.0.1

当我添加一个主题,向该主题发送消息,并从该主题读取消息时,实例工作正常。

然后,我尝试用以下代码创建一个事务:
((KafkaProducer<?, ?>) producer).getInternalKafkaProducer().initTransactions();

在挂起60秒后抛出以下异常:

org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000 milliseconds while awaiting InitProducerId

在谷歌搜索之后,我发现很多帖子都认为这个错误的发生是因为交易需要特殊的配置。也就是说,必须有多个代理和多个同步复制器(ISR)。所以我试着在下面配置这个设置对,但无济于事:

KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 

我试过1/1,3/2。为了解决这个超时错误,它们应该是什么?

如果你想连接到外部容器,你需要在docker容器中发布外部端口。请在您的容器中添加-p 9092:9092。

相关内容

最新更新