如何使用scala在Spark中处理数据集



我使用DataFrame加载CSV然后转换为DataSet但它显示如下

此行有多个标记:
-无法找到存储在数据集中的类型的编码器。通过导入
来支持基本类型(Int、String等)和Product类型(case类)。spark.implicits。_对序列化其他类型的支持将在未来的版本中添加。
(隐式证据$2:
)org.apache.spark.sql.Encoder [DataSet.spark.aacsv]) org.apache.spark.sql.Dataset [DataSet.spark.aacsv]。未指定值参数证据$2

如何解决这个问题?我的代码是-

case class aaCSV(
    a: String, 
    b: String 
    )
object WorkShop {
  def main(args: Array[String]) = {
    val conf = new SparkConf()
      .setAppName("readCSV")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val customSchema = StructType(Array(
        StructField("a", StringType, true),
        StructField("b", StringType, true)))
    val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").schema(customSchema).load("/xx/vv/ss.csv") 
    df.printSchema()
    df.show()
    val googleDS = df.as[aaCSV]
    googleDS.show()
  }
}

现在我像这样改变main函数-

def main(args: Array[String]) = {
    val conf = new SparkConf()
      .setAppName("readCSV")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
import sqlContext.implicits._;
   val sa = sqlContext.read.csv("/xx/vv/ss.csv").as[aaCSV]
    sa.printSchema()
    sa.show()
}

但是它抛出错误- 异常在线程"main" org.apache.spark.sql.AnalysisException:不能解析' Adj_Close '给定输入列:[_c1, _c2, _c5, _c4, _c6, _c3, _c0];第一行pos 7。我该怎么办?

现在我使用spark调度程序基于给定的时间间隔执行我的方法。但我参考这个链接- https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application。

您是否在csv文件中有标题(列名)?如果是,请尝试添加读语句中的.option("header","true")。例子:sqlContext.read.option("header","true").csv("/xx/vv/ss.csv").as[aaCSV] .

下面的博客有不同的Dataframes和Dataset的例子:http://technippet.blogspot.in/2016/10/different-ways-of-creating.html

在将DF转换为DS之前,尝试添加以下导入。

sc.implicits._

sqlContext.implicits._

有关使用DataSet的更多信息https://spark.apache.org/docs/latest/sql-programming-guide.html#creating-datasets

相关内容

  • 没有找到相关文章

最新更新