对于我的用例,我正在使用源中唯一键可识别的数据,这些数据分解为加载到 BigQuery 表中的 n(非确定性(目标条目数,以用于分析目的。
构建此 ETL 以使用 Mongo 最近的更改流功能,我想删除 BigQuery 中的所有条目,然后以原子方式加载新条目。
探索 BigQuery DML 我看到支持合并操作,但只能WHEN MATCHED THEN DELETE
或WHEN MATCHED THEN UPDATE
。
我对匹配时然后删除,然后是插入操作感兴趣。
我将如何在 BigQuery 中实现这样的 ETL,同时在数据可用性和正确性方面尽可能保持原子或最终一致性。
编辑1:我想提供一个具体的例子来详细说明。
我在这个数据集上的唯一性的最低粒度是user_id
。行不能唯一标识。
例
1.
从 mongo 更改流收到的更新用户对象:
user={_id: "3", name="max", registered="2018-07-05" q=["a", "b", "c"]}
阿拉伯数字。
当前BigQuery.user_q保留
| user_id | q |
...
| 3 | a |
| 3 | b |
...
3.
转换代码将修改的用户对象加载到BigQuery.user_q_incoming
| user_id | q |
| 3 | a |
| 3 | b |
| 3 | c |
4.
合并user_q
和user_q_incoming
:
user_q
中属于user_id 3
的 2 行被删除user_q_incoming
中插入属于user_id 3
的 3 行。user_q
中的其余数据(...
(保留在原地,未修改。
5.
BigQuery.user_q保留
| user_id | q |
...
| 3 | a |
| 3 | b |
| 3 | c |
...
例如,用户可能会从其个人资料中删除问题。将其余行保留为q=["a", "c"]
.我也需要这个来转化为 BigQuery 世界观。
INSERT 由 BigQuery DML 支持
MERGE 语句是一个 DML 语句,它可以将 INSERT、UPDATE 和 DELETE 操作组合到单个语句中,并以原子方式执行这些操作。
例如
MERGE dataset.Inventory T
USING dataset.NewArrivals S
ON FALSE
WHEN NOT MATCHED AND product LIKE '%washer%' THEN
INSERT (product, quantity) VALUES(product, quantity)
WHEN NOT MATCHED BY SOURCE AND product LIKE '%washer%' THEN
DELETE
所以,你应该很高兴使用你的ETL
根据添加到问题的更具体的详细信息进行编辑
好的,我明白了 - 我认为在这种情况下,合并将不适用,因为插入只能用于非匹配子句。在这种情况下,有人可能会弄清楚如何欺骗 MERGE 工作,但与此同时,下面的解决方案可以满足您的目标 - 我认为是这样:o(
CREATE OR REPLACE TABLE `project.dataset.user_q` (user_id INT64, q STRING) AS
SELECT * FROM `project.dataset.user_q`
WHERE NOT user_id IN (SELECT DISTINCT user_id FROM `project.dataset.user_q_incoming`)
UNION ALL
SELECT * FROM `project.dataset.user_q_incoming`
WHERE user_id IN (SELECT DISTINCT user_id FROM `project.dataset.user_q`)
有一个类似的问题和一个使 MERGE 工作的走动 (https://issuetracker.google.com/issues/35905927#comment9(。
基本上,像下面这样的东西应该有效,
MERGE `project.dataset.user_q` T
USING (
SELECT *, false AS is_insert FROM `project.dataset.user_q_incoming`
UNION ALL
SELECT *, true AS is_insert FROM `project.dataset.user_q_incoming`
) S
ON T.user_id = S.user_id and NOT is_insert
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED AND is_insert THEN
INSERT(user_id, q) VALUES(user_id, q)
理想情况下,您需要以下内容,但尚不支持。
MERGE `project.dataset.user_q`
USING `project.dataset.user_q_incoming`
ON FALSE
WHEN NOT MATCHED BY TARGET THEN
INSERT(user_id, q) VALUES(user_id, q)
WHEN NOT MATCHED BY SOURCE AND user_id in (SELECT user_id FROM `project.dataset.user_q_incoming`) THEN
DELETE