如何为组件"Write data to HDFS directory"编写 Scala 测试



我有一个简单的代码,以csv和parquet格式将数据写入hdfs,我如何在这里编写scala测试,可以测试以下组件。我实际上不能将数据写入hdfs(在测试中),因为代码是在jules管道中运行的。任何建议都会有帮助的。

df
.write.format("com.databricks.spark.csv")
.option("header", "true")
.mode("append")
.save(hdfspath)

你可以用你的模式写一个样本数据到一个本地路径,使用spark读取它,并比较预期的和实际的输出。

下面是一个使用ScalaTest的例子:

import org.scalatest.FunSuite
import org.scalatest.Matchers
import org.apache.spark.sql.functions.input_file_name
case class RecordSchema(id: Int, value: String) // define here your real schema
class WriteTest extends FunSuite with Matchers {
test("test data was written properly") {
import spark.implicits._
val path = "local/path/dir"
val expectedData = List(RecordSchema(1, "dummyValue1"), RecordSchema(2, "dummyValue2"))
expectedData.toDF
.write.format("com.databricks.spark.csv")
.option("header", "true")
.mode("append")
.save(path)
val actualData = spark.read.format("com.databricks.spark.csv")
.load(path)

// test that the data was written as expected
actualData.as[RecordSchema].collect should contain theSameElementsAs expectedData

}
}

这只是一个示例,您可以将write组件封装到一个单独的方法中(以便将其作为组件进行测试,而不是复制其代码)。请注意将测试中的数据写入新路径(或者在测试中提前删除该路径的内容),否则,由于使用append模式写入,因此此测试的逻辑将无法工作。

最新更新