在Kafka中列出主题时,`java.io.EOFException:null`



我只是尝试将AdminClient类的listTopics()方法与kafka-clients2.7.0 Java API一起使用。我将kafka和zookeeper作为docker容器运行,并具有以下docker组成配置:

version: '3.4'
services:
zookeeper:
image: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
- ZOOKEEPER_CLIENT_PORT=2181
kafka:
image: debezium/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
- KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_LISTENERS=PLAINTEXT_HOST://localhost:9092,PLAINTEXT://kafka:29092
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT_HOST://localhost:9092,PLAINTEXT://kafka:29092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
depends_on:
- zookeeper

此外,我正在创建一个AdminClient:

public MyAdminClient() {
Properties props = new Properties();
String bootstrapServers = "localhost:9092";
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "540000");
myAdmin = AdminClient.create(props);
}

我正在尝试列出创建的myAdmin对象的主题:

ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
KafkaFuture<Set<String>> topicsFuture = streamAdmin.listTopics(options).names();
Set<String> topics = topicsFuture.get();

for (String topic: topics) {
System.out.println("[Kafka topic]: "+topic);
}

然而,我得到了一个循环中抛出的java.io.EOFException: null

18:19:17.404 [main] DEBUG org.apache.kafka.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-1] Queueing Call(callName=listTopics, deadlineMs=1618504097402, tries=0, nextAllowedTryMs=0) with a timeout 540000 ms from now.
18:19:17.418 [kafka-admin-client-thread | adminclient-1] DEBUG org.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1
18:19:17.447 [kafka-admin-client-thread | adminclient-1] DEBUG org.apache.kafka.common.network.Selector - [AdminClient clientId=adminclient-1] Created socket with SO_RCVBUF = 342972, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node -1
18:19:17.961 [kafka-admin-client-thread | adminclient-1] DEBUG org.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Completed connection to node -1. Fetching API versions.
18:19:17.962 [kafka-admin-client-thread | adminclient-1] DEBUG org.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Initiating API versions fetch from node -1.
18:19:17.998 [kafka-admin-client-thread | adminclient-1] DEBUG org.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=adminclient-1, correlationId=0) and timeout 3600000 to node -1: {client_software_name=apache-kafka-java,client_software_version=2.7.0,_tagged_fields={}}
18:19:18.024 [kafka-admin-client-thread | adminclient-1] DEBUG org.apache.kafka.common.network.Selector - [AdminClient clientId=adminclient-1] Connection with localhost/127.0.0.1 disconnected
java.io.EOFException: null
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:97)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)
at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260)

我能够通过强制KAFKA_LISTENERS指向KAFKA代理主机kafka而不是localhost来解决此问题。

  • KAFKA_LISTENERS=PLAINTEXT_HOST://KAFKA:9092,PLAINTEXT://KAFKA:29092

相关内容

最新更新