我正在尝试创建一个简单的DataFrame
,如下所示:
import sqlContext.implicits._
val lookup = Array("one", "two", "three", "four", "five")
val theRow = Array("1",Array(1,2,3), Array(0.1,0.4,0.5))
val theRdd = sc.makeRDD(theRow)
case class X(id: String, indices: Array[Integer], weights: Array[Float] )
val df = theRdd.map{
case Array(s0,s1,s2) => X(s0.asInstanceOf[String],s1.asInstanceOf[Array[Integer]],s2.asInstanceOf[Array[Float]])
}.toDF()
df.show()
DF 定义为
df: org.apache.spark.sql.DataFrame = [id: string, indices: array<int>, weights: array<float>]
这就是我想要的。
执行后,我得到
org.apache.spark.Spark异常:作业由于阶段失败而中止:阶段 13.0 中的任务 1 失败 1 次,最近失败:阶段 13.0 中丢失的任务 1.0(TID 50,本地主机(:scala。MatchError: 1 (class java.lang.String(
这个匹配错误从何而来?而且,有没有更简单的方法以编程方式创建示例DataFrames
?
有关您可以参考的另一个示例
import spark.implicits._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val columns=Array("id", "first", "last", "year")
val df1=sc.parallelize(Seq(
(1, "John", "Doe", 1986),
(2, "Ive", "Fish", 1990),
(4, "John", "Wayne", 1995)
)).toDF(columns: _*)
val df2=sc.parallelize(Seq(
(1, "John", "Doe", 1986),
(2, "IveNew", "Fish", 1990),
(3, "San", "Simon", 1974)
)).toDF(columns: _*)
首先,theRow
应该是Row
而不是Array
。现在,如果你修改你的类型,使Java和Scala之间的兼容性得到尊重,你的示例将起作用。
val theRow =Row("1",Array[java.lang.Integer](1,2,3), Array[Double](0.1,0.4,0.5))
val theRdd = sc.makeRDD(Array(theRow))
case class X(id: String, indices: Array[Integer], weights: Array[Double] )
val df=theRdd.map{
case Row(s0,s1,s2)=>X(s0.asInstanceOf[String],s1.asInstanceOf[Array[Integer]],s2.asInstanceOf[Array[Double]])
}.toDF()
df.show()
//+---+---------+---------------+
//| id| indices| weights|
//+---+---------+---------------+
//| 1|[1, 2, 3]|[0.1, 0.4, 0.5]|
//+---+---------+---------------+