我用下面的代码创建了一个数据框架
val SomeCsv = spark.read.option("header", "true").
csv(conf.getString("data.path.Somecsv")).toDF()
我有一个函数(到目前为止什么也没做),看起来像这样:
def cleanUp(data: sql.DataFrame): sql.DataFrame = {
data.map({
doc =>
(
doc
)
})
}
在编译时出现错误:
"无法找到存储在数据集中的类型的编码器。通过导入spark.implicits._"
,可以支持基本类型(Int, String等)和产品类型(case类)。我按照其他帖子的建议设置了import语句。
val spark = SparkSession.builder...etc
import spark.implicits._
import语句被IntelliJ标记为未使用
我猜
1.) CSV加载代码使用的编码器是对象而不是原语。
2.)和/或我需要在我的函数语句中指定数据框的数据类型,就像您使用RDD一样?我在Spark文档中找不到任何相关信息。
编辑如果我使用
val SomeOtherCsv = SomeCsv.map(t => t(0) + "foobar")
import语句触发,一切都编译得很好。我现在的问题是,相同数据上的方法版本仍然会中断。
EDIT2
这是MCVE
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._/*statement unused*/
import com.typesafe.config.ConfigFactory
object main {
def main(args: Array[String]) = {
/*load spark conf*/
val sparkConf = new SparkConf().setAppName("main")
val sc = new SparkContext(sparkConf)
/*load configure tool*/
val conf = ConfigFactory.load()
/*load spark session*/
val spark = SparkSession.builder.
master("local")
.appName("tester")
.getOrCreate()
import spark.implicits._/* is used for val ProcessedGenomeCsv but not testFunction*/
/*load genome csv as dataframe, conf.getString points to application.conf which contains a local directory for the csv file*/
val GenomeCsv = spark.read.option("header", "true").
csv(conf.getString("data.path.genomecsv")).toDF()
/*cleans up segment names in csv so the can be matched to amino data*/
def testFunctionOne(data: sql.DataFrame): sql.DataFrame = {/* breaks with import spark.implicits._ error, error points to next line "data.map"*/
data.map({
doc =>
(
doc
)
})
}
val ProcessedGenomeCsv = GenomeCsv.map(t => t(12) + "foobar")/* breaks when adding sqlContext and sqlContext.implicits._, is fine otherwise*/
val FunctionProcessedGenomCsv = testFunctionOne(GenomeCsv)
ProcessedGenomeCsv.take(1).foreach(println)
FunctionProcessedGenomCsv.take(1).foreach(println)
}
}
您需要sqlContext.implicits._
您希望在创建sqlContext(已经在spark-shell中为您创建了,但没有在spark-submit中创建)之后声明它
你想让它看起来像这样:
object Driver {
def main(args: Array[String]):Unit = {
val spark_conf =
new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setAppName("Spark Tika HDFS")
val sc = new SparkContext(spark_conf)
import sqlContext.implicits._
val df = ....
}
}