我的工作流程
-
数据使用云函数从Pub/Sub流式传输到BigQuery。
-
数据在流缓冲区中停留90分钟,因此我无法执行Update语句。
-
我需要在此之前更新"结果"列。
请帮忙。
我在">Pub/Sub";那么一个">云功能";它将数据插入"0"中BigQuery";
这是代码:
const { BigQuery } = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
exports.telemetryToBigQuery = (data, context) => {
if (!data.data) {
throw new Error('No telemetry data was provided!');
return;
}
//Data comes in as base64
console.log(`raw data: ${data.data}`);
//Data gets decoded from base64 to string
const dataDataDecode = Buffer.from(data.data, 'base64').toString();
var indexesSemicolons = [];
for (var i = 0; i < dataDataDecode.length; i++) {
if (dataDataDecode[i] === ";") {
indexesSemicolons.push(i);
}
}
if (indexesSemicolons.length == 14) {
const brand = dataDataDecode.slice(0, indexesSemicolons[0]);
const model = dataDataDecode.slice(indexesSemicolons[0] + 1, indexesSemicolons[1]);
const result = dataDataDecode.slice(indexesSemicolons[1] + 1, indexesSemicolons[2]);
async function insertRowsAsStream() {
// Inserts the JSON objects into my_dataset:my_table.
const datasetId = 'put your dataset here';
const tableId = 'put table id here';
const rows = [
{
Brand: brand,
Model: model,
Result: result
}
];
// Insert data into a table
await bigquery
.dataset(datasetId)
.table(tableId)
.insert(rows);
console.log(`Inserted ${rows.length} rows`);
}
insertRowsAsStream();
} else {
console.log("Invalid message");
return;
}
}
这些数据在BigQuery流缓冲区中停留了大约90分钟,但我需要执行一个更新查询来更改Result列。这是不允许的,并导致错误
ApiError: UPDATE or DELETE statement over table pti-tag-copy.ContainerData2.voorinfo would affect rows in the streaming buffer, which is not supported at new ApiError
我需要一种在90分钟缓冲时间之前更新结果的方法。你们能帮帮我吗。
我在线阅读了以下页面
BigQuery流的使用寿命
我读到了以下问题的答案,我想我理解他在说什么,但我不知道如何执行
如果我是对的,他说把我的数据流到一个临时表,然后从那里把它放进一个永久表。
堆栈溢出DML更新bigQuery
是的,没错。当数据流式传输时,不能使用DML。解决方案是查询流缓冲区中的数据,并将它们转换到另一个表中。正如你所说,这可能是暂时的,并将它们放在一张永久的桌子上。
您还可以考虑来自PubSub的流式数据是原始数据,并且您希望保留它们,然后您需要在另一个表中细化数据。这也是一种常见的数据工程模式,具有不同的过滤层和转换层,直到最终有用的数据(也称为数据集市(
回答您的问题。是的,它说您应该将数据流式传输到一个临时表,然后将其复制到另一个永久表,在原始表中您可以启用过期时间。这意味着该表将在过期时间过后被删除。
您可以更改过滤器,使其不包括可能在当前流缓冲区中的数据。如果在更新数据时使用分区表,则可以添加WHERE
子句,其中时间戳的间隔为40到90分钟,如:
WHERE Partitiontime < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 40 MINUTE).
我现在使用的是BigQuery创建作业方法。
这里的例子
我直接将数据放在表中,这样我就不必等待90分钟来获得流缓冲区。