目前,我在一个单元格中有一个spark数据帧(自动加载器(,可能需要一些时间才能写入数据,这会遇到一些问题。然后,在下面的单元格中,代码引用第一个表所做的工作。然而,如果由于火花的分布性质,整个笔记本都在运行(特别是作为作业(,则第二个电池在第一个电池完全完成之前运行。我怎么能让第二个单元格等待writeStream的完成而不把它们放在单独的笔记本中呢。
示例:
Cell1
autoload = pysparkDF.writeStream.format('delta')....table('TABLE1')
Cell2
df = spark.sql('select count(*) from TABLE1')
您需要使用awaitTermination
函数等待流处理完成(请参阅文档(。像这样:
- 单元格1
autoload = pysparkDF.writeStream.format('delta')....table('TABLE1')
autoload.awaitTermination()
- 单元格2
df = spark.sql('select count(*) from TABLE1')
尽管它可以更容易地阅读&像这样的东西更难出错:
df = spark.read.table('TABLE1').count()
更新:等待多个流:
while len(spark.streams.active) > 0:
spark.streams.resetTerminated() # Otherwise awaitAnyTermination() will return immediately after first stream has terminated
spark.streams.awaitAnyTermination()