进口spark.implicits._未使用



我用下面的代码创建了一个数据框架

val SomeCsv = spark.read.option("header", "true").
  csv(conf.getString("data.path.Somecsv")).toDF()

我有一个函数(到目前为止什么也没做),看起来像这样:

def cleanUp(data: sql.DataFrame): sql.DataFrame = {
  data.map({
    doc =>
      (
        doc
        )
  })
}

在编译时出现错误:

"无法找到存储在数据集中的类型的编码器。通过导入spark.implicits._"

,可以支持基本类型(Int, String等)和产品类型(case类)。

我按照其他帖子的建议设置了import语句。

val spark = SparkSession.builder...etc
import spark.implicits._

import语句被IntelliJ标记为未使用

我猜

1.) CSV加载代码使用的编码器是对象而不是原语。

2.)和/或我需要在我的函数语句中指定数据框的数据类型,就像您使用RDD一样?我在Spark文档中找不到任何相关信息。

编辑

如果我使用

val SomeOtherCsv = SomeCsv.map(t => t(0) + "foobar")

import语句触发,一切都编译得很好。我现在的问题是,相同数据上的方法版本仍然会中断。

EDIT2

这是MCVE

import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._/*statement unused*/
import com.typesafe.config.ConfigFactory
object main {
  def main(args: Array[String]) = {
    /*load spark conf*/
    val sparkConf = new SparkConf().setAppName("main")
    val sc = new SparkContext(sparkConf)
    /*load configure tool*/
    val conf = ConfigFactory.load()
    /*load spark session*/
    val spark = SparkSession.builder.
      master("local")
      .appName("tester")
      .getOrCreate()
    import spark.implicits._/* is used for val ProcessedGenomeCsv but not testFunction*/
    /*load genome csv as dataframe, conf.getString points to application.conf which contains a local directory for the csv file*/
    val GenomeCsv = spark.read.option("header", "true").
      csv(conf.getString("data.path.genomecsv")).toDF()
    /*cleans up segment names in csv so the can be matched to amino data*/
    def testFunctionOne(data: sql.DataFrame): sql.DataFrame = {/* breaks with import spark.implicits._ error, error points to next line "data.map"*/
      data.map({
        doc =>
          (
            doc
            )
      })
    }
    val ProcessedGenomeCsv = GenomeCsv.map(t => t(12) + "foobar")/* breaks when adding sqlContext and sqlContext.implicits._, is fine otherwise*/
    val FunctionProcessedGenomCsv = testFunctionOne(GenomeCsv)
    ProcessedGenomeCsv.take(1).foreach(println)
    FunctionProcessedGenomCsv.take(1).foreach(println)
  }
}

您需要sqlContext.implicits._

您希望在创建sqlContext(已经在spark-shell中为您创建了,但没有在spark-submit中创建)之后声明它

你想让它看起来像这样:

object Driver {
    def main(args: Array[String]):Unit = {
        val spark_conf =
          new SparkConf()
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .setAppName("Spark Tika HDFS")
        val sc = new SparkContext(spark_conf)
        import sqlContext.implicits._
        val df = ....
    }
}

最新更新