BigQuery 中的行级原子合并替换



对于我的用例,我正在使用源中唯一键可识别的数据,这些数据分解为加载到 BigQuery 表中的 n(非确定性(目标条目数,以用于分析目的。

构建此 ETL 以使用 Mongo 最近的更改流功能,我想删除 BigQuery 中的所有条目,然后以原子方式加载新条目。

探索 BigQuery DML 我看到支持合并操作,但只能WHEN MATCHED THEN DELETEWHEN MATCHED THEN UPDATE

我对匹配时然后删除,然后是插入操作感兴趣。

我将如何在 BigQuery 中实现这样的 ETL,同时在数据可用性和正确性方面尽可能保持原子或最终一致性。


编辑1:我想提供一个具体的例子来详细说明。

我在这个数据集上的唯一性的最低粒度是user_id。行不能唯一标识。

1.

从 mongo 更改流收到的更新用户对象:

user={_id: "3", name="max", registered="2018-07-05" q=["a", "b", "c"]}

阿拉伯数字。

当前BigQuery.user_q保留

| user_id | q |
...
|    3    | a |
|    3    | b |
...

3.

转换代码将修改的用户对象加载到BigQuery.user_q_incoming

| user_id | q |
|    3    | a |
|    3    | b |
|    3    | c |

4.

合并user_quser_q_incoming

  1. user_q中属于user_id 3的 2 行被删除
  2. user_q_incoming中插入属于user_id 3的 3 行。
  3. user_q中的其余数据(...(保留在原地,未修改。

5.

BigQuery.user_q保留

| user_id | q |
...
|    3    | a |
|    3    | b |
|    3    | c |
...

例如,用户可能会从其个人资料中删除问题。将其余行保留为q=["a", "c"].我也需要这个来转化为 BigQuery 世界观。

INSERT 由 BigQuery DML 支持

MERGE 语句是一个 DML 语句,它可以将 INSERT、UPDATE 和 DELETE 操作组合到单个语句中,并以原子方式执行这些操作。

例如

MERGE dataset.Inventory T
USING dataset.NewArrivals S
ON FALSE
WHEN NOT MATCHED AND product LIKE '%washer%' THEN
INSERT (product, quantity) VALUES(product, quantity)
WHEN NOT MATCHED BY SOURCE AND product LIKE '%washer%' THEN
DELETE   

所以,你应该很高兴使用你的ETL

根据添加到问题的更具体的详细信息进行编辑

好的,我明白了 - 我认为在这种情况下,合并将不适用,因为插入只能用于非匹配子句。在这种情况下,有人可能会弄清楚如何欺骗 MERGE 工作,但与此同时,下面的解决方案可以满足您的目标 - 我认为是这样:o(

CREATE OR REPLACE TABLE `project.dataset.user_q` (user_id INT64, q STRING) AS
SELECT * FROM `project.dataset.user_q`
WHERE NOT user_id IN (SELECT DISTINCT user_id FROM `project.dataset.user_q_incoming`)
UNION ALL
SELECT * FROM `project.dataset.user_q_incoming`
WHERE user_id IN (SELECT DISTINCT user_id FROM `project.dataset.user_q`)

有一个类似的问题和一个使 MERGE 工作的走动 (https://issuetracker.google.com/issues/35905927#comment9(。

基本上,像下面这样的东西应该有效,

MERGE `project.dataset.user_q` T
USING (
SELECT *, false AS is_insert FROM `project.dataset.user_q_incoming`
UNION ALL
SELECT *, true AS is_insert FROM `project.dataset.user_q_incoming`
) S
ON T.user_id = S.user_id and NOT is_insert
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED AND is_insert THEN
INSERT(user_id, q) VALUES(user_id, q)

理想情况下,您需要以下内容,但尚不支持。

MERGE `project.dataset.user_q`
USING `project.dataset.user_q_incoming`
ON FALSE
WHEN NOT MATCHED BY TARGET THEN
INSERT(user_id, q) VALUES(user_id, q)
WHEN NOT MATCHED BY SOURCE AND user_id in (SELECT user_id FROM `project.dataset.user_q_incoming`) THEN
DELETE

最新更新