Spark流2.4.0正在获取org.apache.Spark.sql.AnalysisException:找不到数据源


尝试从Kafka读取数据时出现以下错误。我正在使用docker compose来运行kafka和spark。

Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

这是我的阅读代码:

object Livedata extends App with LazyLogging {
logger.info("starting livedata...")
val spark = SparkSession.builder().appName("livedata").master("local[*]").getOrCreate()
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "topic")
.option("startingOffsets", "latest")
.load()
df.printSchema()
val hadoopConfig = spark.sparkContext.hadoopConfiguration
hadoopConfig.set("fs.hdfs.impl", classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
hadoopConfig.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName)
}

在阅读了一些答案后,我添加了sbt构建的所有包

这是build.sbt文件:

lazy val root = (project in file(".")).
settings(
inThisBuild(List(
organization := "com.live.data",
version := "0.1.0",
scalaVersion := "2.12.2",
assemblyJarName in assembly := "livedata.jar"
)),
name := "livedata",
libraryDependencies ++= List(
"org.scalatest" %% "scalatest" % "3.0.5",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
"org.apache.spark" %% "spark-sql" % "2.4.0",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0" % "provided",
"org.apache.kafka"           % "kafka-clients"            % "2.5.0",
"org.apache.kafka"           % "kafka-streams"            % "2.5.0",
"org.apache.kafka"           %% "kafka-streams-scala"     % "2.5.0"
)
)
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs@_*) => MergeStrategy.discard
case x => MergeStrategy.first
}

不确定这里的主要问题是什么。

更新:

最后我从这里得到了解决方案连接spark结构化流媒体+kafka 时出错

主要问题是获取此org.apache.spark.sql.AnalysisException:找不到数据源:kafka异常,因为spark-sql-kafka库在classpath中不可用&在META-INF/services文件夹中找不到org.apache.spark.sql.sources.DataSourceRegister。

以下代码块需要添加到build.sbt中。这将包括最终jar中的org.apache.spark.sql.sources.DataSourceRegister文件。

// META-INF discarding
assemblyMergeStrategy in assembly := {
case PathList("META-INF","services",xs @ _*) => MergeStrategy.filterDistinctLines
case PathList("META-INF",xs @ _*) => MergeStrategy.discard
case "application.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}```

没有提供spark-sql-kafka-0-10,所以删除这部分依赖项。(不过提供了spark-sql,所以您可以将其添加到其中(

您也不应该拉取Kafka Streams(因为Spark没有使用它(,而且Kafka客户端是由sqlkafka过渡拉取的,所以也不需要

最新更新