我正试图将日志记录添加到我的pyspark结构化流应用程序中,以便查看处理的每个微批的进度和统计信息。writestream方法使用foreach编写器将数据帧中的行写入postgres数据库。我使用.lastProgress
和pyspark提供的其他标准度量进行日志记录。writestream方法和我的日志尝试如下所示。
query_1 = eventsDF
.writeStream
.foreach(writer)
.outputMode("append")
.option("checkpointLocation", "/tmp/checkpoint_a/")
.trigger(processingTime="5 seconds")
.start()
query_progress = query_1.lastProgress
print("progress ", query_progress)
print("status ", query_1.status)
print("active ", query_1.isActive)
query_1.awaitTermination()
我的第一个循环的结果是:
progress None
status {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}
active True
但是,随着事件数据的到达而进行的进一步批处理不会导致更多的日志消息。我希望在流作业中处理每个微批之后都会发出日志消息。
我感谢任何建议或指导。谢谢
start
和awaitTermination
之间的所有代码只执行一次。只有load
和start
之间的代码才能在每个查询触发器上连续执行。
根据这本书";Spark-权威指南";这种监视方式是指在应用程序内部运行。然而,对于独立的应用程序,通常不需要附加shell来运行任意代码。在书中,他们建议";通过实现一个监控服务器来公开[查询]状态,例如一个小型HTTP服务器,它在端口上侦听并在收到请求时返回query.status
。
因此,您需要创建一个专用的可运行线程,该线程经常调用查询的监视API。我真的不熟悉Python,但它基本上看起来如下:
# import the threading module
import threading
class thread(threading.Thread):
def __init__(self, query):
threading.Thread.__init__(self)
self.query = query
# helper function to execute the threads
def run(self):
print("progress ", query.lastProgress);
完成此操作后,您需要将其放置在start
和awaitTermination
:之间
query_1 = eventsDF
[...]
.start()
monitoring = thread(query_1)
query_1.awaitTermination()
您也可以使用while(query_1.isActive)
循环查询的状态,而不是使用专用线程。
对于Scala用户:
awaitTermination后如何获取流式查询的进度?