Flink, Kafka and Zookeeper with an URI



我正试图从本地机器连接到Kafka

kafkaParams.setProperty("bootstrap.servers", Defaults.BROKER_URL)
kafkaParams.setProperty("metadata.broker.list", Defaults.BROKER_URL)
kafkaParams.setProperty("group.id", "group_id")
kafkaParams.setProperty("auto.offset.reset", "earliest")

非常好,但我的BROKER_URI定义如下my-server.com:1234/my/subdirectory

我发现这种现象被称为chroot路径。

它抛出以下错误:Caused by: org.apache.kafka.common.config.ConfigException: Invalid url in bootstrap.servers: my-server.com:1234/my/subdirectory

我该如何解决此问题?

这些是我的依赖项:

val flinkVersion = "1.0.3"
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-connector-kafka-0.9" % flinkVersion,

只需尝试不带路径上下文和斜杠的host:port格式。如果你有多个服务器,它将是一个列表host1:port1,host2:port2

参考:http://kafka.apache.org/documentation.html

bootstrap.servers应该是一个逗号分隔的列表,如下所示:address1:port1,address2:port2,...,addressn:portn。如果您只有一个Kafka代理,则应该输入类似localhost:9092的内容(除非您将Kafka配置为在另一个端口上运行)。

你可以参考dataArtisans的这篇文章,了解如何让Flink和Kafka合作的更多细节。

愚蠢。动物园管理员!=卡夫卡。正如你在代码中看到的,我使用了两次相同的URL,但结果是它们应该不同。

我正试图从本地机器连接到Kafka

kafkaParams.setProperty("bootstrap.servers", Defaults.KAFKA_URL)
kafkaParams.setProperty("metadata.broker.list", Defaults.ZOOKEEPER_URL)
kafkaParams.setProperty("group.id", "group_id")
kafkaParams.setProperty("auto.offset.reset", "earliest")

相关内容

  • 没有找到相关文章

最新更新