我用spark-sql-kafka-0-10
用Spark 2.4
和scala 2.11.12
从kafka读取批处理。所以我的build.sbt
文件有以下依赖项。
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion
我还使用sbt-assembly
插件来制作我的应用程序的胖罐子。在本地 Spark 上部署此 jar 文件时效果很好,如下所示,$FAT_JAR 指向我的程序集文件:
./spark-submit --class $MAIN_CLASS --master local --driver-class-path $FAT_JAR $FAT_JAR
但是当我将其部署在集群上时(即使工作线程和主节点都在同一台机器上),它会抛出有关TopicPartiton
反序列化问题的异常。
我如何在集群上运行:
./spark-submit
--master spark://spark-master:7077
--class $MAIN_CLASS
--driver-class-path $FAT_JAR
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0
$FAT_JAR
我也尝试过--jars
,我确信工人和主人有kafka-client
的保存版本,这是2.0.0
异常日志:
Caused by: java.io.InvalidClassException: org.apache.kafka.common.TopicPartition; class invalid for deserialization
at java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:169)
at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:874)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2043)
为什么 Spark 无法反序列化TopicPartition
以及如何解决它?
我找到了解决方案。正如我SPARK_DIST_PATH
设置为$(hadoop classpath)
,它包括kafka-client-0.8
与spark-sql-kafka-0-10
中使用的kafka-client-2.0.0
不同。我刚刚使用了 spark 的 hadoop 包含版本和未设置SPARK_DIST_PATH
来解决它。
无论如何,我希望spark.executor.userClassPathFirst
和spark.driver.userClassPathFirst
有助于解决这个问题,但现在它们只是实验性的。