如何在GCS中的delta表顶部创建BQ外部表,并仅显示最新快照



我正试图在使用谷歌存储作为存储层的delta表之上创建一个外部BQ外部表。在delta表上,我们执行DML,其中包括删除。

我可以在gs存储桶的顶部创建一个BQ外部表,其中存在所有的delta文件。然而,它甚至提取了删除记录,因为BQ外部表无法读取delta的事务日志,它在其中说明了要考虑哪些镶木地板文件以及要删除哪个文件。

除了以编程方式将数据从delta复制到BQ之外,我们有没有办法将BQ中delta表(gs位置(的最新快照公开为外部表?

所以这个问题和一年多前一样被问到了,但我在Oliver的回答中添加了一些棘手但强大的内容,消除了数据重复和额外的加载逻辑。

步骤1按照Oliver的建议生成symlink_format_manifest文件;您可以在每次更新时触发它,也可以在文件中添加tblproperty,如下所述当delta表更新时自动创建这些文件;

ALTER TABLE delta.`<path-to-delta-table>` SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)

步骤2创建一个指向增量表位置的外部表

> bq mkdef --source_format=PARQUET "gs://test-delta/*.parquet" > bq_external_delta_logs
> bq mk --external_table_definition=bq_external_delta_logs test.external_delta_logs

步骤3创建另一个指向symlink_format_manifest/manifest文件的外部表

> bq mkdef --autodetect --source_format=CSV gs://test-delta/_symlink_format_manifest/manifest > bq_external_delta_manifest
> bq mk --table --external_table_definition=bq_external_delta_manifest test.external_delta_manifest

步骤4使用以下查询创建视图

> bq mk 
--use_legacy_sql=false 
--view 
'SELECT
*
FROM
`project_id.test.external_delta_logs` 
WHERE
_FILE_NAME in (select * from `project_id.test.external_delta_logs`)' 
test.external_delta_snapshot

现在,只要您的delta表从test.external_delta_snapshot视图刷新,您就可以获得最新的快照,而无需任何额外的加载或数据复制此解决方案的一个缺点是,如果架构发生更改,您必须手动或使用BQ客户端等从spark管道向表定义添加新字段。对于那些好奇此解决方案如何工作的人,请继续阅读。


这是如何工作的;

symlink清单文件包含指向当前delta版本分区的换行格式的镶木地板文件列表;

gs://delta-test/......-part1.parquet
gs://delta-test/......-part2.parquet
....

除了delta位置之外,我们还通过将此清单文件视为CSV文件(实际上是单列CSV文件(来定义另一个外部表。我们定义的视图利用了这里提到的_FILE_NAME伪列,它指向表中每一行的镶木地板文件位置。如文档中所述,_FILE_NAME伪列是为指向云存储谷歌硬盘中存储的数据的每个外部表定义的。

因此,在这一点上,我们有了加载最新快照所需的镶木地板文件列表,并能够使用_FILE_NAME列筛选我们想要读取的文件。我们定义的视图只是定义了获取最新快照的过程。每当我们的delta表更新时,manifest和delta日志表都会查找最新的数据,因此我们将始终获得最新的快照,而无需任何额外的加载或数据复制。

最后,众所周知,在外部表上执行比BQ管理的表更昂贵(执行成本(,因此最好像Oliver建议的那样尝试双重写入,并像您要求的那样尝试外部表解决方案。存储比执行更便宜,因此在某些情况下,在GCS和BQ中保存数据的成本可能低于保存这样的外部表。

我也在开发这种管道,我们将delta湖文件转储到GCS中,并在Bigquery中显示。根据GCS增量文件生成清单文件将根据增量文件的当前版本为您提供最新快照。然后,您需要创建一个自定义脚本来解析该清单文件,以获得文件列表,然后运行提及这些文件的bq加载。

val deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")

以下解决方法可能适用于小型数据集。

有一个单独的BQ表。将delta湖文件读取到DataFrame中,然后将df.overwrite写入BigQuery表中。