我正在使用Cloud Functions在BigQuery中跨分区表运行查询。表的大小为1GB。
脚本提取与id匹配的所有数据,并将每个数据流式传输到Google Cloud Storage桶中。它对表中的所有id执行此操作(大约100次)。脚本正在工作,但是它运行缓慢,在云功能耗尽内存之前只拉了9个商家。在构建菜单中增加内存没有任何效果。
是否有一种方法可以加速这个操作并减少每次使用的内存量?
const { BigQuery } = require("@google-cloud/bigquery");
const { Storage } = require("@google-cloud/storage");
const bucketName = "xxxx";
const stream = require("stream");
const { parse, Parser } = require("json2csv");
const bigquery = new BigQuery();
const storage = new Storage();
const fields = [
"id",
"product_name",
"product_desc",
"etc"
];
exports.importBQToGCS = (req, res) => {
"use strict";
const liveMerchantCount = 113;
(async function () {
try {
for (let i = 1; i < liveMerchantCount; i += 1) {
const query =
`SELECT *
FROM `table_name`
WHERE id_number = ` + i;
const options = {
query: query,
location: "EU",
};
const [job] = await bigquery.createQueryJob(options);
console.log(`Job ${job.id} started.`);
const createFile = storage.bucket(bucketName).file(`test_${i}.csv`);
const [rows] = await job.getQueryResults();
const csv = parse(rows, { fields });
const dataStream = new stream.PassThrough();
dataStream.push(csv);
dataStream.push(null);
await new Promise((resolve, reject) => {
console.log("Writing to GCS");
dataStream
.pipe(
createFile.createWriteStream({
resumable: false,
validation: false,
metadata: { "Cache-Control": "public, max-age=31536000" },
})
)
.on("error", (error) => {
console.error("Stream failed", error);
reject(error);
})
.on("finish", () => {
resolve(true);
});
});
}
res.status(200).send();
} catch (err) {
res.send(err);
}
})();
};
在进一步的测试中,这个问题似乎是BigQuery为一个正在为这个函数清除内存的商家返回一个特别大的JSON文件(250MB)。其他的都在100MB以下。一旦我排除了这个特定的查询,函数就会像预期的那样工作。
您可以通过流式处理BigQuery中的记录来减少内存占用。如果您使用@databases客户端,它有一个queryNodeStream
方法正好适用于这个用例:https://www.atdatabases.org/docs/bigquery-client#bigqueryclient-querynodestream
const connectBigQuery, {sql} = require("@databases/bigquery");
const { Storage } = require("@google-cloud/storage");
const bucketName = "xxxx";
const stream = require("stream");
const { parse, Parser } = require("json2csv");
const bigquery = connectBigQuery();
const storage = new Storage();
const fields = [
"id",
"product_name",
"product_desc",
"etc"
];
exports.importBQToGCS = (req, res) => {
"use strict";
const liveMerchantCount = 113;
(async function () {
try {
for (let i = 1; i < liveMerchantCount; i += 1) {
await new Promise((resolve, reject) => {
console.log("Writing to GCS");
bigquery.queryNodeStream(sql`
SELECT * FROM table_name WHERE id_number = ${i}
`)
.on('error', reject)
.pipe(someStreamingCsvLibrary({ fields }))
.on('error', reject)
.pipe(
createFile.createWriteStream({
resumable: false,
validation: false,
metadata: { "Cache-Control": "public, max-age=31536000" },
})
)
.on("error", (error) => {
console.error("Stream failed", error);
reject(error);
})
.on("finish", () => {
resolve(true);
});
});
}
res.status(200).send();
} catch (err) {
res.send(err);
}
})();
};