在我的pyspark代码中,我读取测试csv文件,过滤它,并写入。所有的行动,我可以看到在控制台与json
格式的LoggingLineageDispatcher
,但我想找到一种方法来获得这些数据正确在我的python代码。
我的pyspark
代码:
session = create_spark_session()
test_df: DataFrame = session.read.csv(
"test.csv",
sep =',',
header = True
)
mc_df = test_df.filter(col("Card Type Code") == "MC")
mc_df.write.csv("mc.csv")
session.stop()
我用
spark-submit
--packages za.co.absa.spline.agent.spark:spark-3.2-spline-agent-bundle_2.12:0.7.8
--conf "spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener"
--conf "spark.spline.lineageDispatcher=logging"
spline_test.py
控制台输出的例子,我想在python代码中得到:
22/05/27 18:52:50 INFO LoggingLineageDispatcher: ["plan",{"id":"8408ed4b-2f96-5076-
aaab-59ac3beb7568","name":"spline_test.py","operations":{"write":
{"outputSource":"mc.csv","append":false,"id":"op-
0","name":"InsertIntoHadoopFsRelationCommand","childIds":["op-1"],"params":{"path"....
简短的回答是是的,这是可行的。但是也有一些But。我来详细说明一下。样条代理由Spark驱动程序从一个单独的线程调用,因此要将沿袭内容拉到一个变量中,您需要以并发的方式进行操作。用Spark 2。它更容易,因为操作是阻塞的,当控件返回时,样条工作已经完成,所有的调度程序都被调用,所以您可以期望捕获沿袭。然而,在Spark 3+中,事件监听器与操作是异步处理的,因此您需要实现某种同步,并等待直到继承内容准备好并写入变量。这不是那么直截了当,但却是可行的。我们在集成测试中这样做。看看LineageCaptor类,以及它在一些测试中的用法,例如BasicIntegrationTests
所以,简而言之,创建一个自定义LineageDispatcher,它将获取沿袭信息(一个执行计划和一个事件对象),并将其放入一个"全局"。您的代码也可以访问的线程安全变量。变量应该是线程安全的,例如Promise
或并发集合。然后运行Spark操作,让代码等待,直到沿袭信息到达该变量。
一个自定义LineageDispatcher项目的例子可以在这里找到- https://github.com/AbsaOSS/spline-getting-started/tree/main/spark-agent-extension-example基本上,该项目构建了一个包含自定义扩展的JAR。将该JAR与Spline代理JAR一起包含到Spark驱动程序类路径中,然后在Spline配置中注册并激活它,例如
pyspark ...
--jars my-extension.jar
--packages za.co.absa.spline.agent.spark:spark-2.4-spline-agent-bundle_2.12
--conf "spark.spline.lineageDispatcher.my.className=org.example.MyLineageDispatcher"
--conf "spark.spline.lineageDispatcher=my"
...