如何使用 Segmentio 的 kafka-go 创建 Kafka 主题?



我能得到一个使用segmentio的kafka-go创建主题的例子吗?

我尝试创建一个主题如下:

c, _ := kafka.Dial("tcp", "host:port")
kt := kafka.TopicConfig{Topic: "sometopic", NumPartitions: 1, ReplicationFactor: 1}
e := c.CreateTopics(kt)

但只有当给定的主机:端口是Kafka Leader时,这才有效。如果主机:端口不是Kafka Leader,那么我将得到以下错误:

不是控制器:这不是该集群*的正确控制器

传递集群地址以创建主题的正确方法是什么?

Kafka Segmentio:github.com/Segmentio/Kafka-go

这就是您所需要的:

func (c *Conn) Controller() (broker Broker, err error)
// Controller requests kafka for the current controller and returns its URL

当您使用Dial在代码中打开连接时,您将随机选择群集中的一个代理。因此,您可能/可能不会最终使用实际的Kafka控制器。简单查找控制器并打开新的连接应该会有所帮助。

https://pkg.go.dev/github.com/segmentio/kafka-go?tab=doc#Conn.Controller

正如shmsr所说,您需要获得Leader连接才能创建主题。你可以通过以下方式做到这一点:

conn, err := kafka.Dial("tcp", "host:port")
if err != nil {
panic(err.Error())
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{{Topic: "sometopic", NumPartitions: 1, ReplicationFactor: 1}}
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
panic(err.Error())
}

相关内容

  • 没有找到相关文章

最新更新