PySpark在笔记本(Databricks)中等待完成



目前,我在一个单元格中有一个spark数据帧(自动加载器(,可能需要一些时间才能写入数据,这会遇到一些问题。然后,在下面的单元格中,代码引用第一个表所做的工作。然而,如果由于火花的分布性质,整个笔记本都在运行(特别是作为作业(,则第二个电池在第一个电池完全完成之前运行。我怎么能让第二个单元格等待writeStream的完成而不把它们放在单独的笔记本中呢。

示例:

Cell1

autoload = pysparkDF.writeStream.format('delta')....table('TABLE1')

Cell2

df = spark.sql('select count(*) from TABLE1')

您需要使用awaitTermination函数等待流处理完成(请参阅文档(。像这样:

  • 单元格1
autoload = pysparkDF.writeStream.format('delta')....table('TABLE1')
autoload.awaitTermination()
  • 单元格2
df = spark.sql('select count(*) from TABLE1')

尽管它可以更容易地阅读&像这样的东西更难出错:

df = spark.read.table('TABLE1').count()

更新:等待多个流:

while len(spark.streams.active) > 0:
spark.streams.resetTerminated() # Otherwise awaitAnyTermination() will return immediately after first stream has terminated
spark.streams.awaitAnyTermination()

最新更新