Spark执行计划中存在多个



我目前有一些spark代码(pyspark),它从S3加载数据并对其应用几个转换。当前代码的结构是这样的:一路上有一些持久化,格式如下

df = spark.read.csv(s3path)
df = df.transformation1
df = df.transformation2
df = df.transformation3
df = df.transformation4
df.persist(MEMORY_AND_DISK)
df = df.transformation5
df = df.transformation6
df = df.transformation7
df.persist(MEMORY_AND_DISK)
.
.
.
df = df.transformationN-2
df = df.transformationN-1
df = df.transformationN
df.persist(MEMORY_AND_DISK)

当我在所有转换的最后执行df.explain()时,如预期的那样,执行计划中有多个持久化。现在,当我在所有这些转换的末尾执行以下操作

print(df.count())

触发所有转换,包括持久化。由于spark将流经执行计划,因此它将执行所有这些持久化。在执行第n次持久化时,是否有任何方法可以通知Spark取消第n -1次持久化,或者Spark是否足够聪明地做到这一点。我的问题源于这样一个事实,后来在程序中,我耗尽了磁盘空间,即,火花错误与以下错误:

No space left on device

一个简单的解决方案当然是增加底层实例的数量。但我的假设是,大量的持久化操作最终会导致磁盘耗尽空间。

我的问题是,是这些驴子造成了这个问题吗?如果是这样的话,构造代码的最佳方法/实践是什么,以便我可以自动取消第n -1个持久化。

我对Scala Spark更有经验,但它绝对可以不持久化一个数据框架。

实际上,Dataframe的Pyspark方法也称为unpersist。因此,在您的示例中,您可以这样做(这是相当粗糙的):

df = spark.read.csv(s3path)
df = df.transformation1
df = df.transformation2
df = df.transformation3
df = df.transformation4
df.persist(MEMORY_AND_DISK)
df1 = df.transformation5
df1 = df1.transformation6
df1 = df1.transformation7
df.unpersist()
df1.persist(MEMORY_AND_DISK)
.
.
.
dfM = dfM-1.transformationN-2
dfM = dfM.transformationN-1
dfM = dfM.transformationN
dfM-1.unpersist()
dfM.persist(MEMORY_AND_DISK)

现在,这段代码的样子引发了我的一些问题。为了能够问这个问题,您可能主要将此编写为伪代码,但仍然可能以下问题可以进一步帮助您:

  • 我只看到转换在那里,没有动作。如果是这样的话,你还需要坚持吗?
  • 另外,你似乎只有一个数据源(spark.read.csv位):这似乎也暗示不一定需要持续。
  • 这更多的是关于风格的一点(也许是固执己见的,所以如果你不同意也不用担心)。正如我一开始所说的,我没有Pyspark的经验,但是我将用Scala Spark编写类似于你所写的东西的方式是这样的:
df = spark.read.csv(s3path)
.transformation1
.transformation2
.transformation3
.transformation4
.persist(MEMORY_AND_DISK)
df = df.transformation5
.transformation6
.transformation7
.persist(MEMORY_AND_DISK)
.
.
.
df = df.transformationN-2
.transformationN-1
.transformationN
.persist(MEMORY_AND_DISK)

这样就不那么啰嗦了,而且在我看来更"真实"一点。对于实际发生的事情,只是原始数据框架上的转换链。

相关内容

  • 没有找到相关文章