Spark没有将数据框保存为paraquet文件



尝试将spark数据帧保存为paraquet文件。但无法实现,由于下面的例外。请指导我,如果我错过了什么。数据框架是由kafka流rdd构建的。

dataframe.write.paraquet("/user/space")

异常堆栈:

Exception in thread "streaming-job-executor-0" java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormat
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:263)
at scala.collection.AbstractTraversable.filter(Traversable.scala:105)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:59)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:219)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334)
at KafkaHbaseWrite$$anonfun$main$1.apply(KafkaHbaseWrite.scala:309)
at KafkaHbaseWrite$$anonfun$main$1.apply(KafkaHbaseWrite.scala:280)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 50 more

使用的Pom.xml的快照:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>Paymentprocessor</groupId>
    <artifactId>research</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>research</name>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.tools.version>2.10</scala.tools.version>
        <scala.version>2.10.6</scala.version>
        <spark.version>1.6.1</spark.version>
        <scalaCompatVersion>2.10</scalaCompatVersion>
        <maven-scala-plugin.version>2.15.2</maven-scala-plugin.version>
    </properties>
    <repositories>
        <repository>
            <id>central</id>
            <name>Maven Repository</name>
            <url>https://repo1.maven.org/maven2</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>scala-tools.org</id>
            <name>Scala-tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </repository>
        <repository>
            <id>mapr-releases</id>
            <url>http://repository.mapr.com/maven/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
    </repositories>
    <dependencies>
    <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.tools.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.tools.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.tools.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> 
            <version>2.8.0</version> </dependency> -->
        <dependency>
            <groupId>org.scalatest</groupId>
            <artifactId>scalatest</artifactId>
            <version>1.2</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.15</version>
            <exclusions>
                <exclusion>
                    <groupId>com.sun.jmx</groupId>
                    <artifactId>jmxri</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.sun.jdmk</groupId>
                    <artifactId>jmxtools</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>javax.jms</groupId>
                    <artifactId>jms</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>net.sf.jopt-simple</groupId>
            <artifactId>jopt-simple</artifactId>
            <version>3.2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.6.4</version>
        </dependency>

        <dependency>
            <groupId>com.yammer.metrics</groupId>
            <artifactId>metrics-core</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.yammer.metrics</groupId>
            <artifactId>metrics-annotation</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.easymock</groupId>
            <artifactId>easymock</artifactId>
            <version>3.0</version>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.tools.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-csv</artifactId>
            <version>1.1</version>
        </dependency>
        <dependency>
            <groupId>com.jsuereth</groupId>
            <artifactId>scala-arm_2.10</artifactId>
            <version>1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-producer_2.10</artifactId>
            <version>1.6.1</version>
        </dependency>
        <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-v09_2.10</artifactId> 
            <version>1.6.1-mapr-1607</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> 
            <artifactId>spark-streaming-kafka-producer_2.10</artifactId> <version>1.6.1-mapr-1607</version> 
            </dependency> -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.6.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-hadoop</artifactId>
            <version>1.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hivecontext-compatibility_2.10</artifactId>
            <version>2.0.0-preview</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-eclipse-plugin</artifactId>
                <version>2.8</version>
            </plugin>

        </plugins>
    </build>
</project>

代码片段:

 val messagesDStream: InputDStream[(String, String)] = { 
          KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)     
        }

        val valuesDStream: DStream[String] = messagesDStream.map(_._2)
         /*Construct RDD from Kafka*/  
        println("Count value"+valuesDStream.count())
        /*Construct RDD from Kafka*/       
          valuesDStream.foreachRDD { rdd =>
          // There exists at least one element in RDD
          if (!rdd.isEmpty) {
            val count = rdd.count
            println("count received " + count)
            val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)

            import sqlContext.implicits._
            import org.apache.spark.sql.functions._  

            val cdrDF = rdd.map(CallCreditCardRecord.parseCallCreditCardRecord).toDF()
           val cardRDD = cdrDF.cache()
            println("PRinting")
           cdrDF.registerTempTable("Card")
           cdrDF.printSchema()
           cdrDF.show()
            cdrDF.write.format("parquet").save("/usr/local/Cellar/hadoop/hdfs/tmp/nm-local-dir/CreditCardRecord.parquet")
          }
        }
    ssc.start()
    //ssc.awaitTermination()
    ssc.stop(stopSparkContext = true, stopGracefully = true)

您似乎混合了不同的Spark版本-最有可能的是,您的集群(主/工人)运行一个Spark版本,而您的驱动程序应用程序则运行另一个版本,因此您将获得仅存在于这些版本中的类的ClassNotFoundException

具体来说,org.apache.spark.sql.execution.datasources.FileFormat类是在2周前才创建的,还不是任何正式Spark发布的一部分:您是否在您的某个组件中使用Spark的"最新主"版本?如果是这样,要么在所有组件中使用它(但要准备好看到一些错误和粗糙的地方),要么确保所有代码都是用一个正式版本编译和执行的。

EDIT(在pom文件发布后):您的pom文件包含两个不同的Spark版本- 1.6.1用于大多数依赖项,2.0.0-preview用于最后一个:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hivecontext-compatibility_2.10</artifactId>
  <version>2.0.0-preview</version>
</dependency> 

你应该删除这个依赖项(在1.6.1中不需要)

我在尝试阅读avro时遇到了同样的问题,只是改变了我的gradle依赖以满足spark版本:

dependencies {
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
implementation group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.5'
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
implementation group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.5'
implementation group: 'org.apache.spark', name: 'spark-avro_2.12', version: '2.4.5'}

相关内容

  • 没有找到相关文章