为什么 spark-submit 找不到 kafka 数据源,除非使用 --packages?



我正在尝试将Kafka集成到我的Spark应用程序中,这是我的POM文件所需的条目:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.stream.kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>

相应的工件版本为:

<kafka.version>0.10.2.0</kafka.version>
<spark.stream.kafka.version>2.2.0</spark.stream.kafka.version>

我一直在挠头:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html

我还尝试为jar提供--jars参数,但是它没有帮助。我在这里错过了什么?

法典:

private static void startKafkaConsumerStream() {
Dataset<HttpPackage> ds1 = _spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", getProperty("kafka.bootstrap.servers"))
.option("subscribe", HTTP_FED_VO_TOPIC)
.load() // Getting the error here
.as(Encoders.bean(HttpPackage.class));
ds1.foreach((ForeachFunction<HttpPackage>)  req ->System.out.print(req));
}

_spark定义为:

_spark = SparkSession
.builder()
.appName(_properties.getProperty("app.name"))
.config("spark.master", _properties.getProperty("master"))
.config("spark.es.nodes", _properties.getProperty("es.hosts"))
.config("spark.es.port", _properties.getProperty("es.port"))
.config("spark.es.index.auto.create", "true")
.config("es.net.http.auth.user", _properties.getProperty("es.net.http.auth.user"))
.config("es.net.http.auth.pass", _properties.getProperty("es.net.http.auth.pass"))
.getOrCreate();

我的导入是:

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;

但是,当我运行此处提到的代码并且带有包选项时:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

它有效

Spark Structured Streaming 使用 external kafka-0-10-sql 模块支持 Apache Kafka 作为流源和接收器。

kafka-0-10-sql模块不适用于使用spark-submit提交以执行的 Spark 应用程序。该模块是外部的,要使其可用,您应该将其定义为依赖项。

除非您在 Spark 应用程序中使用特定于模块kafka-0-10-sql代码,否则您不必在pom.xml中将模块定义为dependency。您根本不需要对模块的编译依赖,因为没有代码使用模块的代码。你针对接口编写代码,这是Spark SQL如此令人愉快使用的原因之一(即它只需要很少的代码就可以拥有相当复杂的分布式应用程序)。

但是spark-submit将需要--packages命令行选项,您报告它工作正常。

但是,当我运行此处提到的代码并且带有包选项时:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

它与--packages配合使用良好的原因是,您必须告诉Spark基础架构在哪里可以找到kafka格式的定义。

这导致我们进入另一个"问题"(或要求)使用Kafka运行流式Spark应用程序。您必须指定spark-sql-kafka模块的运行时依赖关系

您可以使用命令行选项(在spark-submitSpark 应用程序后下载必要的 jar)或创建所谓的 uber-jar(或 fat-jar)来指定运行时依赖项--packages

这就是pom.xml发挥作用的地方(这就是为什么人们在pom.xml和模块方面提供帮助的原因dependency)。

因此,首先,您必须指定pom.xml中的依赖项。

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>

最后但并非最不重要的一点是,您必须构建一个超级jar,您可以使用Apache Maven Shade插件在pom.xml中配置该jar。

使用 Apache Maven Shade 插件,您可以创建一个 Uber JAR,它将在 Spark 应用程序 jar 文件中包含kafka格式工作的所有"基础设施"。事实上,Uber JAR 将包含所有必要的运行时依赖项,因此您可以单独使用 jarspark-submit(没有--packages选项或类似选项)。

将下面的依赖项添加到pom.xml文件中。

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>

更新依赖项和版本。下面给定的依赖项应该可以正常工作:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.1</version>
</dependency>

PS:请注意在前两个依赖项中提供了范围。

相关内容

  • 没有找到相关文章

最新更新