我正在测试一个容器化的kafka实例,我用下面的命令创建了它:
docker run -d --name kafkacontainer -p 9093:9093
-e KAFKA_BROKER_ID=1
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
-e KAFKA_ZOOKEEPER_CONNECT=172.17.0.2:2181
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://mytestvm:9093,BROKER://localhost:9092
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092
-e KAFKA_INTER_BROKER_LISTENER_NAME=BROKER
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT
-e KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
confluentinc/cp-kafka:7.0.1
当我添加一个主题,向该主题发送消息,并从该主题读取消息时,实例工作正常。
然后,我尝试用以下代码创建一个事务:((KafkaProducer<?, ?>) producer).getInternalKafkaProducer().initTransactions();
在挂起60秒后抛出以下异常:
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000 milliseconds while awaiting InitProducerId
在谷歌搜索之后,我发现很多帖子都认为这个错误的发生是因为交易需要特殊的配置。也就是说,必须有多个代理和多个同步复制器(ISR)。所以我试着在下面配置这个设置对,但无济于事:
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
我试过1/1,3/2。为了解决这个超时错误,它们应该是什么?
如果你想连接到外部容器,你需要在docker容器中发布外部端口。请在您的容器中添加-p 9092:9092。