Spark:单元测试 - 我有一个函数可以联合 3 个输入数据集.我应该对它们进行单元测试吗?



我写了一个代码部分,如下所示

Object Cal{
def mergedatasets(df: Dataset[Row], df1: Dataset[Row],df2: Dataset[Row]):Dataset[Row]={
df.union(df1).union(df2)
//other logic
}
}
object readDataframes{
def readFirstDF(spark:SparkSession):Dataset[Row]={
spark.read.json(somefile)
}
def readSecondDF(spark:SparkSession):Dataset[Row]={
spark.read.json(somefile)
}
def readThirdDF(spark:SparkSession):Dataset[Row]={
spark.read.json(somefile)
}
}

在上面的代码中,我正在读取 3 个文件,然后将它们合并为一个我进一步用于处理的文件。 基于上述情况,我的问题如下:

  1. 对函数合并数据集进行单元测试是否有意义?如果是,要测试的基本/最小内容是什么?如何检查角落情况(如果有(?
  2. 对读取数据帧进行单元测试是否有意义?如果是,要测试什么?是检查推断的架构是否符合预期吗?还有什么事吗?

我也想为以下功能扩展上述问题

def timeIntervalAgg(df: Dataset[Row]): Dataset[Row] = {
val timeInterval = df
.groupBy("id","page_number")
.agg(sum("timeInterval").alias("timeInterval"))
timeIntervalAgg
}
def timeInterval(df: Dataset[Row]): Dataset[Row] ={
val windowSpec = Window.partitionBy("id").orderBy("date_time")
val timeFmt = "yyyy-MM-dd'T'HH:mm:ss"
val endTime = lead(col("date_time"),1).over(windowSpec)
val startTime = col("date_time")
val timeDiff = (unix_timestamp(endTime, timeFmt)
- unix_timestamp(startTime, timeFmt))
val timeInterval = df
.withColumn("timeInterval", lit(when(col("event") === "this_event",lit(null)
.cast("long"))
.otherwise(timeDiff)))
.where("""event != "this_event" """)
timeInterval
}
def addOddpages(df: Dataset[Row]) :Dataset[Row] = {
val odd = df
.where("""view_mode = "twin" """)
.withColumn("page_odd", col("page") + 1)
.drop("page")
.select(col("id"), col("date_time")
.cast("timestamp"),col("page_odd")
.alias("page"), col("page_view_mode"),
col("event"),col("timeInterval"))
val timeIntervalWithoddPage = df.union(odd)
timeIntervalWithoddPage
}
  • 请建议是否需要以更好的方式重构代码 以实现更好的测试。

  • 我的目标是了解要测试什么? 同时要注意什么 像上面这样的代码编写测试?所有这些问题都是针对Spark 代码单元测试而不是其他语言代码测试。

  • 如何在不冗余测试已经存在的火花的情况下进行单元测试 测试?
  • 是否需要像这样测试每个函数(因为逻辑/代码不是很复杂(还是最好测试 以适当的顺序组合上述函数的函数。通过做 那么它可以称为单元测试吗?
  • 请随时分享一些您可能编写的示例单元测试 对于上面的代码。

读取 JSON 文件:如果您只读取 JSON 文件,则无需对此进行测试。 此外,最好在schema()中读取具有显式架构的文件,以避免推断架构出现问题。此外,您不需要 3 种相同的方法来读取文件。

联合数据集:从Spark 2.3.0开始,功能unionByName()。 该函数按名称(而不是按位置(解析列。可以考虑使用这些函数,以避免在数据帧具有不同列顺序时出现联合问题。当然,这个函数不需要测试。很难说mergedatasets()方法中的//other logic代码。

对于单元测试,您可以使用 ScalaTest 或其他工具。

  • 使用master("local")创建 SparkSession ;
  • 使用预期数据创建数据帧;
  • 为要测试的每个方法创建一个输入数据帧。
  • 比较预期和实际数据帧;

以下项目可能有用。您可以在那里找到如何比较两个数据帧。此外,自述文件中还有几个示例:https://github.com/MrPowers/spark-fast-tests

相关内容

最新更新