KAFKA 1.0.0管理员客户端无法使用EofException创建主题



使用1.0.0 kafka admin客户端我希望在代理上编程创建一个主题。我碰巧正在使用Scala。我尝试使用以下代码在Kafka经纪人上创建一个主题,或者只是列出可用主题

import org.apache.kafka.clients.admin.{AdminClient, ListTopicsOptions, NewTopic}
import scala.collection.JavaConverters._
val zkServer = "localhost:2181"
val topic = "test1"
val zookeeperConnect = zkServer
val sessionTimeoutMs = 10 * 1000
val connectionTimeoutMs = 8 * 1000

val partitions = 1
val replication:Short = 1
val topicConfig = new Properties() // add per-topic configurations settings here
import org.apache.kafka.clients.admin.AdminClientConfig
val config = new Properties
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, zkServer)
val admin = AdminClient.create(config)
val existing = admin.listTopics(new ListTopicsOptions().timeoutMs(500).listInternal(true))
val nms = existing.names()
nms.get().asScala.foreach(nm => println(nm)) // nms.get() fails
val newTopic = new NewTopic(topic, partitions, replication)
newTopic.configs(Map[String,String]().asJava)
val ret = admin.createTopics(List(newTopic).asJavaCollection)
ret.all().get() // Also fails
admin.close()

使用任何一个命令,Zookeeper(3.4.10)侧抛出EofException并关闭连接。调试Zookeeper端本身,看来它无法应对管理客户端正在发送的消息(它运行于试图读取的字节)

任何人都可以使1.0.0 Kafka Admin客户端工作以创建或列出主题?

addinclient直接连接到kafka,不需要访问Zookeeper。

您需要设置AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG以指向您的Kafka经纪人(例如localhost:9092),而不是Zookeeper。

最新更新