我有一个我使用DataFrame API加载的1GB CSV文件。我还实施了一个自定义的Transformer
,该自定义可以通过Estimator
进行处理。
transform
方法正在执行一些不同的操作:
- 铸造柱。
- 过滤行。
- 删除列。
- 创建在其他列上应用功能的新列。
我担心此过程中的内存使用量。如果每次转换后,我将结果存储在变量中,会发生什么?例如(简化):
override def transform(dataset: Dataset[_]): DataFrame = {
val df = dataset.withColumn("DayOfWeek", $"DayOfWeek".cast("int"))
val df2 = df1.filter($"Diverted" === 0)
val df3 = df2.drop(forbiddenVariables: _*)
val df4 = df3.withColumn("DepHour", hourExtractorUdf($"DepTime"))
val df5 = df4.select($"*", concat($"Origin", lit("-"), $"Dest").as("Route"))
df5
}
假设我是为了记录一个转换和另一种转换之间的目的。
好的。第二个选项。如果我使用var
代替val
?
override def transform(dataset: Dataset[_]): DataFrame = {
var df = dataset.withColumn("DayOfWeek", $"DayOfWeek".cast("int"))
df = df.filter($"Diverted" === 0)
df = df.drop(forbiddenVariables: _*)
df = df.withColumn("DepHour", hourExtractorUdf($"DepTime"))
df = df.select($"*", concat($"Origin", lit("-"), $"Dest").as("Route"))
df
}
我想现在,在所有过程中,我都没有在内存中加载5个数据范围。对吗?
最后,下一个选项又如何效率吗?
override def transform(dataset: Dataset[_]): DataFrame = {
dataset.withColumn("DayOfWeek", $"DayOfWeek".cast("int"))
.filter($"Diverted" === 0)
.drop(forbiddenVariables: _*)
.withColumn("DepHour", hourExtractorUdf($"DepTime"))
.select($"*", concat($"Origin", lit("-"), $"Dest").as("Route"))
}
当然,我认为没有比其他选择更昂贵的选择。
代码的所有版本都是等效的,因为它们在末尾产生相同的数据框,并且没有执行副作用。似乎对火花如何运作有一些根本的误解。数据范围不包含数据。它们只是执行计划。
在学习火花中,我们经常讨论"转换"one_answers"动作"之间的区别。
转换修改数据,这些是filter
,select
,drop
和任何其他修改您的数据框的方法。"转换"做零工作,他们只是构建执行计划。
另一方面的动作实际上会执行一些明显的效果。这些是保存到文件,收集结果或用foreach消费数据的事情。只有在调用操作时,您的数据帧才能评估并运行转换。
1GB的数据也很小,如果您确实需要使用Spark,则可能会重新考虑。