样条,pyspark:如何在我的python代码中获得样条控制台输出?



在我的pyspark代码中,我读取测试csv文件,过滤它,并写入。所有的行动,我可以看到在控制台与json格式的LoggingLineageDispatcher,但我想找到一种方法来获得这些数据正确在我的python代码。

我的pyspark代码:

session = create_spark_session()
test_df: DataFrame = session.read.csv(
"test.csv",
sep =',', 
header = True
)
mc_df = test_df.filter(col("Card Type Code") == "MC")
mc_df.write.csv("mc.csv")
session.stop() 

我用

spark-submit 
--packages za.co.absa.spline.agent.spark:spark-3.2-spline-agent-bundle_2.12:0.7.8 
--conf "spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener" 
--conf "spark.spline.lineageDispatcher=logging" 
spline_test.py

控制台输出的例子,我想在python代码中得到:

22/05/27 18:52:50 INFO LoggingLineageDispatcher: ["plan",{"id":"8408ed4b-2f96-5076- 
aaab-59ac3beb7568","name":"spline_test.py","operations":{"write": 
{"outputSource":"mc.csv","append":false,"id":"op- 
0","name":"InsertIntoHadoopFsRelationCommand","childIds":["op-1"],"params":{"path"....

简短的回答是是的,这是可行的。但是也有一些But。我来详细说明一下。样条代理由Spark驱动程序从一个单独的线程调用,因此要将沿袭内容拉到一个变量中,您需要以并发的方式进行操作。用Spark 2。它更容易,因为操作是阻塞的,当控件返回时,样条工作已经完成,所有的调度程序都被调用,所以您可以期望捕获沿袭。然而,在Spark 3+中,事件监听器与操作是异步处理的,因此您需要实现某种同步,并等待直到继承内容准备好并写入变量。这不是那么直截了当,但却是可行的。我们在集成测试中这样做。看看LineageCaptor类,以及它在一些测试中的用法,例如BasicIntegrationTests

所以,简而言之,创建一个自定义LineageDispatcher,它将获取沿袭信息(一个执行计划和一个事件对象),并将其放入一个"全局"。您的代码也可以访问的线程安全变量。变量应该是线程安全的,例如Promise或并发集合。然后运行Spark操作,让代码等待,直到沿袭信息到达该变量。

一个自定义LineageDispatcher项目的例子可以在这里找到- https://github.com/AbsaOSS/spline-getting-started/tree/main/spark-agent-extension-example基本上,该项目构建了一个包含自定义扩展的JAR。将该JAR与Spline代理JAR一起包含到Spark驱动程序类路径中,然后在Spline配置中注册并激活它,例如

pyspark ... 
--jars my-extension.jar 
--packages za.co.absa.spline.agent.spark:spark-2.4-spline-agent-bundle_2.12 
--conf "spark.spline.lineageDispatcher.my.className=org.example.MyLineageDispatcher" 
--conf "spark.spline.lineageDispatcher=my" 
...

最新更新