如何使用foreachbatch报告微批大小



我有一个当前的spark作业,看起来像这样(用pyspark编写)。它所做的只是从S3位置读取数据,并将增量位置写入另一个S3位置(s3_delta_path)。我想稍微修改一下,使用"foreachbatch"函数,以便插入一些额外的逻辑来跟踪进度。具体来说,每次它写一个批处理,我只想计算这个批处理的大小,然后把结果写在别的地方。除此之外,我希望转会工作的核心逻辑保持不变。我怎样才能做到这一点呢?

df.writeStream.format("delta").outputMode("append").option("checkpointLocation",
s3_checkpoint_path).trigger(availableNow=True).start(s3_delta_path)```

希望这对你有帮助。您可以根据需要修改processBatch函数。

def processBatch(batch_df, batch_id):
print(batch_id,batch_df.count(),batch_df.columns())

#batch_df.count() -- returns the number of rows
#batch_df.columns() -- returns the number of columns
batch_df.write.....


df.writeStream
.format("delta")
.foreachBatch(processBatch)
.outputMode("append")
.option("checkpointLocation",s3_checkpoint_path)
.trigger(availableNow=True)
.start()

最新更新