在我的DLP管道中,我有三层-青铜,银色和金色。铜层从S3存储桶读取JSON文件,而银层执行数据处理任务,如添加新列。金层负责对处理过的数据执行聚合。
我想从我的DLP管道的黄金层写入数据到Kafka主题。然而,由于DLT不支持写流操作。我在金表上执行readStream操作,然后尝试在单独的笔记本中将数据写入Kafka。由于黄金表是一个不断更新的物化视图,因此当我试图从中提取数据时,我的readStream代码会失败。如果我尝试使用"忽略更改"选项来防止这个问题,我的表最终会被复制。
处理这个问题最有效的方法是什么?因此,如果您正在更改表的数据(在这种情况下通过覆盖),您不能将其作为流读取。还有另一种可行的解决方案,那就是使用变更数据馈送(CDF)。基本上你可以从gold delta表中消费类似于CDC的事件,就像你从CDC工具中得到的一样,比如Debezium。以下步骤应该可以工作:
- 通过设置表属性
delta.enableChangeDataFeed
到true
,在金表上启用CDF 您可以使用以下PySpark代码将CDF作为流使用:
(
spark
.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("gold_table_name")
)
- 除了原始表的列外,该流还将有列:
_change_type
,_commit_version
和_commit_timestamp
,您可能希望在写入 之前过滤或转换此流 - 用你想要的格式写流到Kafka
更多文档可以在这里找到。