尝试将spark数据帧保存为paraquet文件。但无法实现,由于下面的例外。请指导我,如果我错过了什么。数据框架是由kafka流rdd构建的。
dataframe.write.paraquet("/user/space")
异常堆栈:
Exception in thread "streaming-job-executor-0" java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormat
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:263)
at scala.collection.AbstractTraversable.filter(Traversable.scala:105)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:59)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:219)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334)
at KafkaHbaseWrite$$anonfun$main$1.apply(KafkaHbaseWrite.scala:309)
at KafkaHbaseWrite$$anonfun$main$1.apply(KafkaHbaseWrite.scala:280)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 50 more
使用的Pom.xml的快照:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Paymentprocessor</groupId>
<artifactId>research</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>research</name>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.tools.version>2.10</scala.tools.version>
<scala.version>2.10.6</scala.version>
<spark.version>1.6.1</spark.version>
<scalaCompatVersion>2.10</scalaCompatVersion>
<maven-scala-plugin.version>2.15.2</maven-scala-plugin.version>
</properties>
<repositories>
<repository>
<id>central</id>
<name>Maven Repository</name>
<url>https://repo1.maven.org/maven2</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
<repository>
<id>mapr-releases</id>
<url>http://repository.mapr.com/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId>
<version>2.8.0</version> </dependency> -->
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest</artifactId>
<version>1.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.15</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
<version>3.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.4</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-annotation</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>com.jsuereth</groupId>
<artifactId>scala-arm_2.10</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.7</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-producer_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-v09_2.10</artifactId>
<version>1.6.1-mapr-1607</version> </dependency> <dependency> <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-producer_2.10</artifactId> <version>1.6.1-mapr-1607</version>
</dependency> -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hivecontext-compatibility_2.10</artifactId>
<version>2.0.0-preview</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.8</version>
</plugin>
</plugins>
</build>
</project>
代码片段:
val messagesDStream: InputDStream[(String, String)] = {
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
}
val valuesDStream: DStream[String] = messagesDStream.map(_._2)
/*Construct RDD from Kafka*/
println("Count value"+valuesDStream.count())
/*Construct RDD from Kafka*/
valuesDStream.foreachRDD { rdd =>
// There exists at least one element in RDD
if (!rdd.isEmpty) {
val count = rdd.count
println("count received " + count)
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
import org.apache.spark.sql.functions._
val cdrDF = rdd.map(CallCreditCardRecord.parseCallCreditCardRecord).toDF()
val cardRDD = cdrDF.cache()
println("PRinting")
cdrDF.registerTempTable("Card")
cdrDF.printSchema()
cdrDF.show()
cdrDF.write.format("parquet").save("/usr/local/Cellar/hadoop/hdfs/tmp/nm-local-dir/CreditCardRecord.parquet")
}
}
ssc.start()
//ssc.awaitTermination()
ssc.stop(stopSparkContext = true, stopGracefully = true)
您似乎混合了不同的Spark版本-最有可能的是,您的集群(主/工人)运行一个Spark版本,而您的驱动程序应用程序则运行另一个版本,因此您将获得仅存在于这些版本中的类的ClassNotFoundException
。
具体来说,org.apache.spark.sql.execution.datasources.FileFormat
类是在2周前才创建的,还不是任何正式Spark发布的一部分:您是否在您的某个组件中使用Spark的"最新主"版本?如果是这样,要么在所有组件中使用它(但要准备好看到一些错误和粗糙的地方),要么确保所有代码都是用一个正式版本编译和执行的。
EDIT(在pom文件发布后):您的pom文件包含两个不同的Spark版本- 1.6.1
用于大多数依赖项,2.0.0-preview
用于最后一个:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hivecontext-compatibility_2.10</artifactId>
<version>2.0.0-preview</version>
</dependency>
你应该删除这个依赖项(在1.6.1中不需要)
我在尝试阅读avro时遇到了同样的问题,只是改变了我的gradle依赖以满足spark版本:
dependencies {
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
implementation group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.5'
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
implementation group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.5'
implementation group: 'org.apache.spark', name: 'spark-avro_2.12', version: '2.4.5'}