我正在尝试打印数据帧的计数,然后打印它的前几行,最后将其发送出去进行进一步处理。
奇怪的是,在调用count()
后,数据帧变为空。
val modifiedDF = funcA(sparkDF)
val deltaDF = modifiedDF.except(sparkDF)
println(deltaDF.count()) // prints 10
println(deltaDF.count()) //prints 0, similar behavior with show
funcB(deltaDF) //gets null dataframe
我能够使用deltaDF.collect.foreach(println)
和随后对count
的调用来验证相同的内容。
但是,如果我不调用count
或show
,而只是按原样发送,funcB
得到 10 行的整个 DF。
这是意料之中的吗?
funcA()
及其依赖关系的定义:
def funcA(inputDataframe: DataFrame): DataFrame = {
val col_name = "colA"
val modified_df = inputDataframe.withColumn(col_name, customUDF(col(col_name)))
val modifiedDFRaw = modified_df.limit(10)
modifiedDFRaw.withColumn("colA", modifiedDFRaw.col("colA").cast("decimal(38,10)"))
}
val customUDF = udf[Option[java.math.BigDecimal], java.math.BigDecimal](myUDF)
def myUDF(sval: java.math.BigDecimal): Option[java.math.BigDecimal] = {
val strg_name = Option(sval).getOrElse(return None)
if (change_cnt < 20) {
change_cnt = change_cnt + 1
Some(strg_name.multiply(new java.math.BigDecimal("1000")))
} else {
Some(strg_name)
}
}
首先,用作UserDefinedFunction
的函数必须至少是幂等的,但最佳纯度。否则,结果只是不确定的。虽然最新版本中提供了一些逃生舱口(可以提示 Spark 不应重新执行函数),但这些在这里对您没有帮助。
此外,具有可变稳定(目前还不清楚change_cnt
的来源是什么,但它在udf
中都是写入和读取的)根本不行 - Spark不提供全局可变状态。
总体而言,您的代码:
- 修改某个对象的某个本地副本。
- 根据此类对象做出决策。
不幸的是,这两个组件根本无法挽救。您必须回到规划阶段并重新考虑您的设计。
您的数据帧是一个分布式数据集,尝试执行 count() 会返回不可预测的结果,因为每个节点中的 count() 可能不同。阅读下面有关 RDD 的文档。它也适用于数据帧。
https://spark.apache.org/docs/2.3.0/rdd-programming-guide.html#understanding-closures- https://spark.apache.org/docs/2.3.0/rdd-programming-guide.html#printing-elements-of-an-rdd