Spark 流式处理标准示例中缺少必需的配置"bootstrap.servers"错误



我对Scala和Spark有点陌生,所以请随意评判我,但不要太难。

我正在尝试启动标准的DirectKafkaWordCount示例(随Spark2安装一起提供(来测试Spark Streaming如何与Kafka一起工作。

这是示例的代码(也可以在这里找到(:

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// scalastyle:off println
package org.apache.spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: DirectKafkaWordCount <brokers> <topics>
*   <brokers> is a list of one or more Kafka brokers
*   <topics> is a list of one or more kafka topics to consume from
*
* Example:
*    $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port 
*    topic1,topic2
*/
object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <topics>
|  <brokers> is a list of one or more Kafka brokers
|  <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(brokers, topics) = args
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
// Get the lines, split them into words, count the words and print
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
// scalastyle:on println

在尝试启动它时,我不得不将 spark-streaming-kafka-0-10_2.11-2.3.1.jar 和 kafka-clients-0.10.0.1.jar 放入/usr/hdp/3.0.0.0-1634/spark2/jars/目录(这让我有些惊讶,因为我认为安装提供的所有标准示例都必须开箱即用,但 WordCount 示例声称这些包(。添加这些jar后,我尝试从主题测试中读取记录并通过命令进行字数统计

/

usr/hdp/3.0.0.0-1634/spark2/bin/run-example 流。DirectKafkaWordCount localhost:9092 test

但是,应用程序失败,我得到的错误如下所示:

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:421)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:55)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:376)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:84)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:70)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:240)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at org.apache.spark.examples.streaming.DirectKafkaWordCount$.main(DirectKafkaWordCount.scala:70)
at org.apache.spark.examples.streaming.DirectKafkaWordCount.main(DirectKafkaWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

这让我感到困惑,因为我在启动命令中提供了引导服务器(localhost:9092(。有什么想法从这里挖掘吗?

我的配置:

火花 - 2.3.1

卡夫卡 - 2.11-1.0.1

您需要在 kafka 参数中添加bootstrap.servers,因为使用者需要引导服务器来使用来自任何主题的消息。spark-streaming-kafka-0-10_2.11-2.3.1.jar.

val kafkaParams = Map[String, Object]("bootstrap.servers" -> "alpha-kafka-1.com:9092,alpha-kafka-2.com:9092,alpha-kafka-3.com:9092")

资源: https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#creating-a-direct-stream

该示例已经一年多没有更新了,但似乎您需要将metadata.broker.list重命名为bootstrap.servers,这是所有其他 Kafka 客户端使用的属性名称。

无论如何run-example我不确定脚本是否正确传递了参数,但您需要提供 Kafka 代理的外部 IP 或主机名,而不是本地主机。

此外,结构化流和数据帧API建议在Spark2+中通过DStream和RDD进行

以防万一,如果您正在使用 kafka 进行 Spring 引导并且遇到此错误

org.apache.kafka.common.config.ConfigException:缺少没有默认值的必需配置"bootstrap.servers"。

确保您已准备好以下事项:

  1. spring.kafka.bootstrap-servers 在 poperrty 或 yml 文件中设置此属性。
  2. Zookeeper 和 kafka 服务器正在运行。
  3. Consumer 通过此命令"kafka-console-consumer.bat/sh"(根据操作系统(运行。
  4. spring.kafka.consumer.group-id 需要设置。
  5. spring.kafka.consumer.auto-offset-reset=最早

这将对某人有所帮助。

谢谢

阿图

相关内容

最新更新