我创建了一个谷歌云平台函数,该函数监听Pub/Sub Topic并将数据插入BigQuery中。我设法使一些代码几乎可以工作。几乎:insert指令报告没有错误,但在BigQuery中插入的行的所有列都为null。
这是云功能的代码,运行在NodeJs 6,128Mb的mem上,由Pub/Sub 触发
我已经尝试了以下两个变量的所有组合,其中两个不同的错误消息的忽略设置被设置为false(请参阅文章底部(
'ignoreUnknownValues':true, 'raw':false
package.json
{
"name": "sample-pubsub",
"version": "0.0.1",
"dependencies": {
"@google-cloud/bigquery": "^1.3.0"
}
}
功能体
/**
* Triggered from a message on a Cloud Pub/Sub topic.
*
* @param {!Object} event Event payload and metadata.
* @param {!Function} callback Callback function to signal completion.
*/
exports.helloPubSub = (event, callback) => {
const pubsubMessage = event.data;
console.log(Buffer.from(pubsubMessage.data, 'base64').toString());
const BigQuery = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
bigquery
.dataset("init_data")
.table ("tronc_queteur")
.insert ([pubsubMessage], {'ignoreUnknownValues':true, 'raw':false})
.then ((data) => {
console.log(`Inserted 1 rows`);
console.log(data);
})
.catch(err => {
if (err && err.name === 'PartialFailureError') {
if (err.errors && err.errors.length > 0) {
console.log('Insert errors:');
err.errors.forEach(err => console.error(err));
}
} else {
console.error('ERROR:', err);
}
});
callback();
};
传递给函数的数据如下(从函数的第一个控制台.log((中可以看到(
**数据**
{
"id":9999,
"queteur_id":552,
"point_quete_id":49,
"tronc_id":281,
"depart_theorique":"2018-06-17 08:09:33",
"depart":"2018-06-17 08:09:33",
"retour":"2018-06-17 10:26:20",
"comptage":"2018-11-08 21:23:02",
"last_update":"2018-11-08 21:23:02",
"last_update_user_id":1,
"euro500":0,
"euro200":0,
"euro100":0,
"euro50":0,
"euro20":1,
"euro10":3,
"euro5":1,
"euro2":0,
"euro1":37,
"cents50":12,
"cents20":0,
"cents10":0,
"cents5":0,
"cents2":0,
"cent1":93,
"don_cheque":0,
"don_creditcard":0,
"foreign_coins":null,
"foreign_banknote":null,
"notes_depart_theorique":null,
"notes_retour":null,
"notes_retour_comptage_pieces":null,
"notes_update":null,
"deleted":false,
"coins_money_bag_id":"2018-PIECE-059",
"bills_money_bag_id":"2018-BILLET-013",
"don_cb_sans_contact_amount":0,
"don_cb_sans_contact_number":0,
"don_cb_total_number":0,
"don_cheque_number":0
}
这是表模式,我曾经在BigQuery:中创建表和加载数据
**BigQuery表定义**
[
{"name": "id","type":"INTEGER"},
{"name": "queteur_id","type":"INTEGER"},
{"name": "point_quete_id","type":"INTEGER"},
{"name": "tronc_id","type":"INTEGER"},
{"name": "depart_theorique","type":"STRING"},
{"name": "depart","type":"STRING"},
{"name": "retour","type":"STRING"},
{"name": "comptage","type":"STRING"},
{"name": "last_update","type":"STRING"},
{"name": "last_update_user_id","type":"INTEGER"},
{"name": "euro500","type":"INTEGER"},
{"name": "euro200","type":"INTEGER"},
{"name": "euro100","type":"INTEGER"},
{"name": "euro50","type":"INTEGER"},
{"name": "euro20","type":"INTEGER"},
{"name": "euro10","type":"INTEGER"},
{"name": "euro5","type":"INTEGER"},
{"name": "euro2","type":"INTEGER"},
{"name": "euro1","type":"INTEGER"},
{"name": "cents50","type":"INTEGER"},
{"name": "cents20","type":"INTEGER"},
{"name": "cents10","type":"INTEGER"},
{"name": "cents5","type":"INTEGER"},
{"name": "cents2","type":"INTEGER"},
{"name": "cent1","type":"INTEGER"},
{"name": "foreign_coins","type":"INTEGER"},
{"name": "foreign_banknote","type":"INTEGER"},
{"name": "notes_depart_theorique","type":"STRING"},
{"name": "notes_retour","type":"STRING"},
{"name": "notes_retour_comptage_pieces","type":"STRING"},
{"name": "notes_update","type":"STRING"},
{"name": "deleted","type":"INTEGER"},
{"name": "don_creditcard","type":"FLOAT"},
{"name": "don_cheque","type":"FLOAT"},
{"name": "coins_money_bag_id","type":"STRING"},
{"name": "bills_money_bag_id","type":"STRING"},
{"name": "don_cb_sans_contact_amount","type":"FLOAT"},
{"name": "don_cb_sans_contact_number","type":"INTEGER"},
{"name": "don_cb_total_number","type":"INTEGER"},
{"name": "don_cheque_number","type":"INTEGER"}
]
带有"ignoreUnknownValues":false,"raw":false
severity: "ERROR"
textPayload: "{ errors: [ { message: 'no such field.', reason: 'invalid' } ],
row:
{ '@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage',
attributes: { location: 'Detroit' },
data: 'eyJpZCI6OT...'
数据的base64编码如下:
{"id":9999,"queteur_id":552,"point_quete_id":49,"tronc_id":281,"depart_theorique":"2018-06-17 08:09:33","depart":"2018-06-17 08:09:33","retour":"2018-06-17 10:26:20","comptage":"2018-11-08 22:18:59","last_update":"2018-11-08 22:18:59","last_update_user_id":1,"euro500":0,"euro200":0,"euro100":0,"euro50":0,"euro20":1,"euro10":3,"euro5":1,"euro2":0,"euro1":37,"cents50":12,"cents20":0,"cents10":0,"cents5":0,"cents2":0,"cent1":93,"don_cheque":0,"don_creditcard":0,"foreign_coins":null,"foreign_banknote":null,"notes_depart_theorique":null,"notes_retour":null,"notes_retour_comptage_pieces":null,"notes_update":null,"deleted":false,"coins_money_bag_id":"2018-PIECE-059","bills_money_bag_id":"2018-BILLET-013","don_cb_sans_contact_amount":0,"don_cb_sans_contact_number":0,"don_cb_total_number":0,"don_cheque_number":0}
带有"ignoreUnknownValues":false,"raw":true
消息:">
textPayload: "{ errors: [ { message: '', reason: 'invalid' } ],
row:
{ '@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage',
attributes: { location: 'Detroit' },
data: 'eyJpZCI6O...'
数据是完全相同的有效载荷(与上面的base64屏幕相同(
**在大查询端**
下面的查询返回一个行计数,当我测试每列中只有空值时,行计数不断增加
选择是:
select *
from `init_data.tronc_queteur` as tq
where tq.id is null
结果如下:
Row id queteur_id point_quete_id tronc_id depart_theorique depart retour comptage last_update last_update_user_id euro500 euro200 euro100 euro50 euro20 euro10 euro5 euro2 euro1 cents50 cents20 cents10 cents5 cents2 cent1 foreign_coins foreign_banknote notes_depart_theorique notes_retour notes_retour_comptage_pieces notes_update deleted don_creditcard don_cheque coins_money_bag_id bills_money_bag_id don_cb_sans_contact_amount don_cb_sans_contact_number don_cb_total_number don_cheque_number
1 null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null
2 null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null
3 null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null
问题来自于如何向Cloud Big Query插入函数提供数据。
在您发送编码消息的行.insert ([pubsubMessage], {'ignoreUnknownValues':true, 'raw':false})
中,Big Query库找不到所需的列值,因为它需要一个JSON对象(对于您的情况(,所以它插入了所有null
值。您必须将消息解码为String
,并将其解析为JSON。
我得到的工作插件是这样的:
bigquery
.dataset("init_data")
.table ("tronc_queteur")
.insert (JSON.parse(Buffer.from(pubsubMessage.data, 'base64').toString()),
{'ignoreUnknownValues':true, 'raw':false})
.then ((data) => {
console.log(`Inserted 1 rows`);
console.log(data);
})
.catch(err => {
if (err && err.name === 'PartialFailureError') {
if (err.errors && err.errors.length > 0) {
console.log('Insert errors:');
err.errors.forEach(err => console.error(err));
}
} else {
console.error('ERROR:', err);
}
});
这是可行的,但我对代码并不完全满意。我将详细了解云功能是如何接收Pub/Sub消息的。如果我发现任何相关内容,我将编辑此答案。
我认为您需要在package.json文件中列出pub/sub依赖项,如下所示:
{
"name": "sample-pubsub",
"version": "0.0.1",
"dependencies": {
"@google-cloud/pubsub": "^0.18.0",
"@google-cloud/bigquery": "^4.1.1"
}
}
我也错过了好几次。太容易忘记了!
希望这能解决你的问题!如果没有,请告诉我!