我在编写的scala应用程序中尝试创建DataFrame时遇到了一个问题。
我遇到的问题是编译scala时出错,toDF不是RDD的一部分。我看到一些答案建议将case类定义移出main,并在sqlContext声明后导入implicits,但即使这样对我也不起作用。
这就是我目前拥有的:
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
object ErrorParser {
case class Error(time: String, status: String, statusType: String, host: String, message: String)
def splitError(line: String) : Array[String] = {
var array:Array[String] = new Array[String](5)
...
return array
}
def filterErrors(errors: Array[Array[String]]) : Array[Array[String]] = {
var filteredErrors = ArrayBuffer[Array[String]]()
...
return filteredErrors.toArray
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("ErrorParserAPI")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
var logs = sc.textFile("hdfs://hadoop-master:9000/logs/data/logs/server.*")
var errors = logs.filter(line => line.contains("ERROR"))
val errors1 = errors.map(line => splitError(line))
val filteredErrors = filterErrors(errors1.collect)
val dfErrors = filteredErrors.map(p => Error(p(0).split(":")(0) + ":" + p(0).split(":")(1), p(1), p(2), p(3), p(4)))
val filteredRDD = sc.parallelize(dfErrors)
var errorDF = filteredRDD.toDF()
errorDF.write.json("hdfs://hadoop-master:9000/results/errorParserResult")
}
}
我被难住了,因为在火花壳里,事情是这样运作的。
我还看到一些答案,建议将RDD更改为RDD[Row]的实例,然后使用
sc.createDataFrame(rdd, scheme)
但我无法想象我会怎么做。
任何帮助都将不胜感激!
这是我的.sbt文件:
name := "ErrorParserAPI"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.10" % "1.6.1",
"org.apache.spark" % "spark-sql_2.10" % "1.6.1"
)
编辑:一个打字错误
我刚刚复制了您的代码并粘贴到我的eclipse中,它运行良好,没有任何编译错误。如果您正在使用eclipse,您可以尝试清理和刷新您的项目。
import scala.Array.canBuildFrom
import scala.collection.mutable.ArrayBuffer
import scala.reflect.runtime.universe
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object ErrorParser {
def filterErrors(errors: Array[Array[String]]): Array[Array[String]] = {
var filteredErrors = ArrayBuffer[Array[String]]()
return filteredErrors.toArray
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("ErrorParserAPI")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
var logs = sc.textFile("hdfs://hadoop-master:9000/logs/data/logs/server.*")
var errors = logs.filter(line => line.contains("ERROR"))
val errors1 = errors.map(line => splitError(line))
val filteredErrors = filterErrors(errors1.collect)
val dfErrors = filteredErrors.map(p => Error(p(0).split(":")(0) + ":" + p(0).split(":")(1), p(1), p(2), p(3), p(4)))
val filteredRDD = sc.parallelize(dfErrors)
var errorDF = filteredRDD.toDF()
}
case class Error(time: String, status: String, statusType: String, host: String, message: String)
def splitError(line: String): Array[String] = {
var array: Array[String] = new Array[String](5)
return array
}
}