我正在尝试运行一个非常简单的Spark Streaming单词计数程序,该程序从Kafka
主题中读取。以下是我的代码:
val spark = SparkSession
.builder()
.appName("KafkaWordCount")
.config("spark.master", "local")
.getOrCreate()
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.load()
val lines = df.selectExpr("CAST(value AS STRING)").as[String]
val words = lines.flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
当我运行这个程序时,我得到以下异常:
Exception in thread "stream execution thread for [id = f704d6e5-14bf-4bd7-94a0-38c4b77986ea, runId = d277eaac-e18c-4128-954b-6a318bb8039c]" Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:287)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:394)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.4
at com.fasterxml.jackson.module.scala.JacksonModule$class.setupModule(JacksonModule.scala:64)
at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:19)
at com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:751)
at org.apache.spark.rdd.RDDOperationScope$.<init>(RDDOperationScope.scala:82)
at org.apache.spark.rdd.RDDOperationScope$.<clinit>(RDDOperationScope.scala)
... 28 more
org.apache.spark.sql.streaming.StreamingQueryException: null
其他Stack Overflow的答案建议在pom中包含不同版本的jackson。因此,首先它不是一个专家,而是一个sbt项目。下面是我的建筑.sbt
name := "spark"
version := "0.1"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "2.3.1",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.1",
"org.apache.kafka" %% "kafka" % "1.1.0"
)
我需要做些什么来删除这个错误并使这个程序正常工作?
Apache Spark使用jackson 2.6.7版本:
<fasterxml.jackson.version>2.6.7</fasterxml.jackson.version>
<fasterxml.jackson.databind.version>2.6.7.1</fasterxml.jackson.databind.version>
请参阅Spark项目pom.xml
而卡夫卡使用的是杰克逊Jackson:"2.9.6">版本。
versions += [
activation: "1.1.1",
apacheda: "1.0.0",
apacheds: "2.0.0-M24",
argparse4j: "0.7.0",
bcpkix: "1.59",
easymock: "3.6",
jackson: "2.9.6",
jetty: "9.2.24.v20180105",
Kafka Gradle
为了纠正这种情况,覆盖冲突的jar版本如下:
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7"
请参阅此链接以获取更多详细信息:
https://www.scala-sbt.org/1.x/docs/Library-Management.html#Overriding+a+版本
对于不兼容的jackson版本,我也收到了同样的错误
我的pom中已经存在以下依赖项…其他属性。。。。。<scala.version>2.12.11<scala.version><jackson版本>2.11.2<jackson版本>com.fasterxml.jackson.corejackson注释${jackson.version}编写
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<scope>compile</scope>
</dependency>
我仍然得到不兼容的版本错误。在搜索了很多之后,我添加了下面的依赖项,它开始工作了。
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<version>${jackson.version}</version>
</dependency>
请将${scala.binary.version}替换为您的scala版本以及${jackson.version}和您的jackson版本