火花可重复性/确定性结果



我正在运行下面的Spark代码(基本上是作为MVE创建的(,它执行:

  1. 阅读拼花地板和限制
  2. 分区依据
  3. 加入
  4. 过滤器

我很难理解为什么每次运行应用程序时,joined数据帧(即上述第3阶段之后的数据帧(中的行数不同。为什么会发生这种情况?

我认为不应该发生这种情况的原因是limit是确定性的,所以每次分区的数据帧中都应该有相同的行,尽管顺序不同。在连接中,我在分区所在的字段上进行连接。我希望在一个分区内有每个对的组合,但我认为这应该等于每次都有相同的数字。

def main(args: Array[String]) {
val maxRows = args(0)
val spark = SparkSession.builder.getOrCreate()
val windowSpec = Window.partitionBy("epoch_1min").orderBy("epoch")
val data = spark.read.parquet("srcfile.parquet").limit(maxRows.toInt)
val partitionDf = data.withColumn("row", row_number().over(windowSpec))
partitionDf.persist(StorageLevel.MEMORY_ONLY)
logger.debug(s"${partitionDf.count()} rows in partitioned data")
val dfOrig = partitionDf.withColumnRenamed("epoch_1min", "epoch_1min_orig").withColumnRenamed("row", "row_orig")
val dfDest = partitionDf.withColumnRenamed("epoch_1min", "epoch_1min_dest").withColumnRenamed("row", "row_dest")
val joined = dfOrig.join(dfDest, dfOrig("epoch_1min_orig") === dfDest("epoch_1min_dest"), "inner")
logger.debug(s"Rows in joined dataframe ${joined.count()}")
val filtered = joined.filter(col("row_orig") < col("row_dest"))
logger.debug(s"Rows in filtered dataframe ${filtered.count()}")
}
  1. 如果启动一个新的应用程序,可能会发生底层数据更改
  2. 否则,使用Spark SQL就像在RDBMS上使用ANSI SQL一样,当不使用ORDER BY时,无法保证数据的顺序。因此,您不能假设在执行器分配不同的情况下,第二次处理将是相同的(没有排序/排序(,等等

最新更新