Spark 1.6.1: creating DataFrame from RDD[Array[Error]]



我在编写的scala应用程序中尝试创建DataFrame时遇到了一个问题。

我遇到的问题是编译scala时出错,toDF不是RDD的一部分。我看到一些答案建议将case类定义移出main,并在sqlContext声明后导入implicits,但即使这样对我也不起作用。

这就是我目前拥有的:

import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
object ErrorParser {
    case class Error(time: String, status: String, statusType: String, host: String, message: String)
    def splitError(line: String) : Array[String] = {
        var array:Array[String] = new Array[String](5)
        ...
        return array
    }
    def filterErrors(errors: Array[Array[String]]) : Array[Array[String]] = {
        var filteredErrors = ArrayBuffer[Array[String]]()
        ...
        return filteredErrors.toArray
    }
    def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("ErrorParserAPI")
        val sc = new SparkContext(conf)
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        import sqlContext.implicits._
        var logs = sc.textFile("hdfs://hadoop-master:9000/logs/data/logs/server.*")
        var errors = logs.filter(line => line.contains("ERROR"))
        val errors1 = errors.map(line => splitError(line))
        val filteredErrors = filterErrors(errors1.collect)
        val dfErrors = filteredErrors.map(p => Error(p(0).split(":")(0) + ":" + p(0).split(":")(1), p(1), p(2), p(3), p(4)))
        val filteredRDD = sc.parallelize(dfErrors)
        var errorDF = filteredRDD.toDF()
        errorDF.write.json("hdfs://hadoop-master:9000/results/errorParserResult")
   }
}

我被难住了,因为在火花壳里,事情是这样运作的。

我还看到一些答案,建议将RDD更改为RDD[Row]的实例,然后使用

sc.createDataFrame(rdd, scheme)

但我无法想象我会怎么做。

任何帮助都将不胜感激!

这是我的.sbt文件:

name := "ErrorParserAPI"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
        "org.apache.spark" % "spark-core_2.10" % "1.6.1",
        "org.apache.spark" % "spark-sql_2.10" % "1.6.1"
)

编辑:一个打字错误

我刚刚复制了您的代码并粘贴到我的eclipse中,它运行良好,没有任何编译错误。如果您正在使用eclipse,您可以尝试清理和刷新您的项目。

import scala.Array.canBuildFrom
import scala.collection.mutable.ArrayBuffer
import scala.reflect.runtime.universe
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object ErrorParser {

  def filterErrors(errors: Array[Array[String]]): Array[Array[String]] = {
    var filteredErrors = ArrayBuffer[Array[String]]()
    return filteredErrors.toArray
  }
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("ErrorParserAPI")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
    var logs = sc.textFile("hdfs://hadoop-master:9000/logs/data/logs/server.*")
    var errors = logs.filter(line => line.contains("ERROR"))
    val errors1 = errors.map(line => splitError(line))
    val filteredErrors = filterErrors(errors1.collect)
    val dfErrors = filteredErrors.map(p => Error(p(0).split(":")(0) + ":" + p(0).split(":")(1), p(1), p(2), p(3), p(4)))
    val filteredRDD = sc.parallelize(dfErrors)
    var errorDF = filteredRDD.toDF()
  }
  case class Error(time: String, status: String, statusType: String, host: String, message: String)
  def splitError(line: String): Array[String] = {
    var array: Array[String] = new Array[String](5)
    return array
  }
}

相关内容

  • 没有找到相关文章

最新更新