对于基本的数据帧创建示例,我应该如何在 Spark 中编写单元测试?



我正在努力编写一个基本的单元测试来创建数据帧,使用 Spark 提供的示例文本文件,如下所示。

class dataLoadTest extends FunSuite with Matchers with BeforeAndAfterEach {
private val master = "local[*]"
private val appName = "data_load_testing"
private var spark: SparkSession = _
override def beforeEach() {
spark = new SparkSession.Builder().appName(appName).getOrCreate()
}
import spark.implicits._
case class Person(name: String, age: Int)
val df = spark.sparkContext
.textFile("/Applications/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0),attributes(1).trim.toInt))
.toDF()
test("Creating dataframe should produce data from of correct size") {
assert(df.count() == 3)
assert(df.take(1).equals(Array("Michael",29)))
}
override def afterEach(): Unit = {
spark.stop()
}

}

我知道代码本身有效(从spark.implicits._....toDF(((,因为我已经在 Spark-Scala 外壳中验证了这一点,但在测试类中我收到了很多错误;IDE 无法识别"导入spark.implicits._"或 toDF((,因此测试不会运行。

我正在使用SparkSession,它可以在引擎盖下自动创建SparkConf,SparkContext和SQLContext。

我的代码仅使用 Spark 存储库中的示例代码。

任何想法为什么这不起作用?谢谢!

铌。我已经看过 StackOverflow 上的 Spark 单元测试问题,比如:如何在 Spark 2.0+ 中编写单元测试? 我已经使用它来编写测试,但我仍然收到错误。

我正在使用带有SBT和IntelliJ的Scala 2.11.8和Spark 2.2.0。这些依赖项正确包含在 SBT 构建文件中。运行测试时的错误是:

错误:(29, 10( value toDF 不是 org.apache.spark.rdd.RDD[dataLoadTest.this.Person] 的成员 可能的原因:也许在"值到DF"之前缺少分号? .toDF((

错误:(20, 20( 需要稳定的标识符,但找到 dataLoadTest.this.spark.implicits。 导入spark.implicits._

IntelliJ 无法识别导入spark.implicits._或 .toDF(( 方法。

我已导入: import org.apache.spark.sql.SparkSession 导入组织标量测试。{BeforeAndAfterEach, FlatSpec, FunSuite, Matchers}

您需要将sqlContext分配给val才能implicits工作。由于您的sparkSessionvarimplicits无法使用它

所以你需要做

val sQLContext = spark.sqlContext
import sQLContext.implicits._

此外,您可以为测试编写函数,以便测试类如下所示

class dataLoadTest extends FunSuite with Matchers with BeforeAndAfterEach {
private val master = "local[*]"
private val appName = "data_load_testing"
var spark: SparkSession = _
override def beforeEach() {
spark = new SparkSession.Builder().appName(appName).master(master).getOrCreate()
}

test("Creating dataframe should produce data from of correct size") {
val sQLContext = spark.sqlContext
import sQLContext.implicits._
val df = spark.sparkContext
.textFile("/Applications/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
assert(df.count() == 3)
assert(df.take(1)(0)(0).equals("Michael"))
}
override def afterEach() {
spark.stop()
}
}
case class Person(name: String, age: Int)

有很多用于 Spark 单元测试的库,其中最常用的是

火花测试基地霍顿·卡劳

这个库都有sc,因为下面的SparkContext是一个简单的例子

class TestSharedSparkContext extends FunSuite with SharedSparkContext {
val expectedResult = List(("a", 3),("b", 2),("c", 4))
test("Word counts should be equal to expected") {
verifyWordCount(Seq("c a a b a c b c c"))
}
def verifyWordCount(seq: Seq[String]): Unit = {
assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList)
}
}

在这里,每一样东西都是用sc作为SparkContext

另一种方法是创建一个TestWrapper并用于多个testcases,如下所示

import org.apache.spark.sql.SparkSession
trait TestSparkWrapper {
lazy val sparkSession: SparkSession = 
SparkSession.builder().master("local").appName("spark test example ").getOrCreate()
}

并将此TestWrapper用于 Scala 测试的所有tests,与BeforeAndAfterAllBeforeAndAfterEach一起玩。

希望这有帮助!

最新更新