使用 Spark 数据帧进行单元测试



我正在尝试测试对数据帧执行转换的程序的一部分我想测试这些数据帧的几种不同变体,这排除了从文件中读取特定 DF 的选项

所以我的问题是:

  1. 有没有关于如何使用 Spark 和数据帧执行单元测试的好教程,尤其是关于数据帧的创建?
  2. 如何在没有大量样板文件和不从文件中读取这些样板的情况下创建这些不同的多行数据帧?
  3. 是否有任何实用程序类用于检查数据帧中的特定值?

我之前显然用谷歌搜索过,但找不到任何非常有用的东西。我发现的更有用的链接包括:

  • 使用数据帧运行基本单元测试
  • 使用 DF 定制断言

如果示例/教程在 Scala 中,那就太好了,但我会采用您拥有的任何语言

提前致谢

此链接显示了我们如何以编程方式创建具有架构的数据框。您可以将数据保存在单独的特征中,并将其与测试混合在一起。例如

// This example assumes CSV data. But same approach should work for other formats as well.
trait TestData {
  val data1 = List(
    "this,is,valid,data",
    "this,is,in-valid,data",
  )
  val data2 = ...  
}

然后使用ScalaTest,我们可以做这样的事情。

class MyDFTest extends FlatSpec with Matchers {
  "method" should "perform this" in new TestData {
     // You can access your test data here. Use it to create the DataFrame.
     // Your test here.
  }
}

若要创建数据帧,可以使用如下所示的实用方法。

  def schema(types: Array[String], cols: Array[String]) = {
    val datatypes = types.map {
      case "String" => StringType
      case "Long" => LongType
      case "Double" => DoubleType
      // Add more types here based on your data.
      case _ => StringType
    }
    StructType(cols.indices.map(x => StructField(cols(x), datatypes(x))).toArray)
  }
  def df(data: List[String], types: Array[String], cols: Array[String]) = {
    val rdd = sc.parallelize(data)
    val parser = new CSVParser(',')
    val split = rdd.map(line => parser.parseLine(line))
    val rdd = split.map(arr => Row(arr(0), arr(1), arr(2), arr(3)))
    sqlContext.createDataFrame(rdd, schema(types, cols))
  }

我不知道有任何用于检查数据帧中特定值的实用程序类。但我认为使用数据帧 API 编写一个应该很简单。

你可以

使用 Spark 用于自己的单元测试的SharedSQLContextSharedSparkSession。查看我的答案以获取示例。

对于那些希望在 Java 中实现类似目标的人,您可以使用 start 首先使用此项目在单元测试中初始化 SparkContext: https://github.com/holdenk/spark-testing-base

我个人不得不模仿一些AVRO文件的文件结构。因此,我使用 Avro-tools (https://avro.apache.org/docs/1.8.2/gettingstartedjava.html#download_install) 使用以下命令从二进制记录中提取架构:

java -jar $AVRO_HOME/avro tojson largeAvroFile.avro | head -3

然后,使用此小型帮助程序方法,可以将输出 JSON 转换为数据帧,以便在单元测试中使用。

private DataFrame getDataFrameFromList() {
    SQLContext sqlContext = new SQLContext(jsc());
    ImmutableList<String> elements = ImmutableList.of(
        {"header":{"appId":"myAppId1","clientIp":"10.22.63.3","createdDate":"2017-05-10T02:09:59.984Z"}}
        {"header":{"appId":"myAppId1","clientIp":"11.22.63.3","createdDate":"2017-05-11T02:09:59.984Z"}}
        {"header":{"appId":"myAppId1","clientIp":"12.22.63.3","createdDate":"2017-05-11T02:09:59.984Z"}}
    );
    JavaRDD<String> parallelize = jsc().parallelize(elements);
    return sqlContext.read().json(parallelize);
}

相关内容

  • 没有找到相关文章

最新更新