Spark抛出ClassNotFoundException,但该类在我提交给Spark的jar中



Spark结构化流抛出ClassNotFoundException,但类在提交给Spark-submit的JAR中。

你好,我有一个名为output jms的模块,它基本上从Kafka加载数据(使用CCD_ 1(,将其转换并推送到ActiveMQ JMS

当我编译我的项目时,它会生成output-jms.jar文件。

output-jms.jar文件被传递给spark-submit。

output-jms.jar包含类JmsWriter.scala.

当我运行应用程序时,我会收到以下异常:

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)
... 40 more
Caused by: java.lang.ClassNotFoundException: com.addmeaning.output.jms.spark.service.JmsWriter
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1995)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1862)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2169)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2102)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:493)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:451)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

这是我运行到的一个函数

def writeStreamData(inputDF: DataFrame, serviceDocument: OutputJmsDocument): StreamingQuery = {
val target = serviceDocument.getDetails.getTarget
val userName = target.getUsername
val password = target.getPassword
val brokerUrl = target.getBrokerUrl
val clientId = target.getClientId
val topic = target.getTopic
try {
inputDF
.repartition(1)
.select(col("_value"))
.writeStream
.queryName(target.getQueryName)
.option("checkpointLocation", target.getCheckpointLocation)
.trigger(
serviceDocument.getExecutionType match {
case ExecutionType.STREAMING =>
Trigger.ProcessingTime(target.getTriggerInterval)
case _ => Trigger.Once()
}
)
.foreach(new JmsWriter(userName,
password, brokerUrl, clientId, topic)).start()
} catch {
case e: Throwable =>
logError("Error write stream: " + e)
throw e
}
}

这是JmsWriter的代码:

class JmsWriter(username: String, password: String, brokerUrl: String, clientId: String, topic: String) extends ForeachWriter[Row] with Logging {
val jmsConnectionFactory = JmsConnector.initActiveMqConnectionFactory(username, password, brokerUrl, clientId)
@transient var connection: Connection = _
@transient var jmsSession: Session = _
@transient var jmsProducer: MessageProducer = _
override def open(partitionId: Long, epochId: Long): Boolean = {
connection = {
val cn = jmsConnectionFactory.createConnection()
cn.setClientID(clientId)
cn.start()
logInfo("ActiveMQ connection created")
cn
}
jmsSession = JmsConnector.initActiveMqSession(connection)
jmsProducer = JmsConnector.initActiveMqProducer(jmsSession, topic)
true
}
override def process(value: Row): Unit = jmsProducer.send(jmsSession.createTextMessage(value.mkString("")))
override def close(errorOrNull: Throwable): Unit = {
if (errorOrNull != null) logError("Error during writing the stream: " + errorOrNull)
jmsProducer.close()
jmsSession.close()
connection.close()
}
}

我做错了什么?如何摆脱ClassNotFoundException?如果你需要任何附加信息,请告诉我。

首先需要检查类是否存在于jar中是否使用

jar-tvf jarname.jar | grep your_class

问题已解决。Spark无法找到这些类,因为Spring Boot的maven配置将Spark类放在jar中的/Boot-INF/文件夹下。我并没有怀疑,因为Spark代码在单独的模块中。

使用maven shade插件有所帮助。

最新更新