以下使用 BigQuery 的 go 客户端库的代码返回错误
type GceQuotaRow struct {
CustomerProjectNumber int64
ExecutionProjectNumber int64
Region string
Metric string
Limit float64
}
ds := b.bq.Dataset(task.Options.Dataset)
table := ds.Table(task.Options.Table)
u := table.Uploader()
rows := []bigquery.StructSaver{}
// rows := []*GceQuotaRow{}
schema, err := bigquery.InferSchema(GceQuotaRow{})
if err != nil {
log.Errorf("Inferring BigQuery scheme failed; %v", util.Pformat(err))
return backfill.TaskPermanentError
}
// Construct an insertID to dedupe insertions on a best effort basis.
for _, region := range res.GetRegionalQuotas() {
for _, q := range region.GetQuotas() {
r := &GceQuotaRow{
CustomerProjectNumber: task.CustomerProjectNumber,
ExecutionProjectNumber: res.GetExecutionProjectNumber(),
Region: region.GetRegion(),
Metric: q.GetMetric(),
Limit: q.GetLimit(),
}
h := sha256.New()
h.Write([]byte(util.Pformat(r)))
// Use the sha256 of the row as the InsertId to avoid duplicates.
insertId := fmt.Sprintf("%x", h.Sum(nil))
rows = append(rows, bigquery.StructSaver{
Schema: schema,
InsertID: insertId,
Struct: r,
})
// rows = append(rows, r)
}
}
err = u.Put(ctx, rows)
if err != nil {
putErr, isErr := err.(bigquery.PutMultiError)
if isErr {
for _, e := range putErr {
log.Errorf("There was a problem writing Row for customer project: %v, Error: %v", task.CustomerProjectNumber, util.Pformat(e))
}
} else {
log.Errorf("There was a problem writing GCE Quota Rows for customer project: %v, Error: %v", task.CustomerProjectNumber, err)
}
}
对 Put 的调用返回错误:
Error: bigquery: schema inference for recursive type *bigquery.FieldSchema
我不明白为什么 Put 会返回有关推断架构的错误,因为 InferSchema 调用成功,因此应该将架构传递给结构保护程序。
我想使用 StructSaver,以便我可以提供一个插入 ID 来重复数据删除行。
我只是遇到了这个问题并困惑了。发生这种情况的原因是具体bigquery.StructSaver
没有实现bigquery.ValueSaver
接口,因为它的方法在指针类型上运行*bigquery.StructSaver
.这会导致 golang 客户端库回退到结构上的通用架构推理逻辑(在本例中为bigquery.StructSaver
(,然后失败,因为bigquery.StructSaver
嵌套其他结构,因此本身不是保存在 BigQuery 中的有效对象。您可以按如下方式修复代码:
for _, region := range res.GetRegionalQuotas() {
for _, q := range region.GetQuotas() {
r := &GceQuotaRow{
CustomerProjectNumber: task.CustomerProjectNumber,
ExecutionProjectNumber: res.GetExecutionProjectNumber(),
Region: region.GetRegion(),
Metric: q.GetMetric(),
Limit: q.GetLimit(),
}
h := sha256.New()
h.Write([]byte(util.Pformat(r)))
// Use the sha256 of the row as the InsertId to avoid duplicates.
insertId := fmt.Sprintf("%x", h.Sum(nil))
rows = append(rows, &bigquery.StructSaver{
Schema: schema,
InsertID: insertId,
Struct: r,
})
// rows = append(rows, r)
}
}
err = u.Put(ctx, rows)
if err != nil {
putErr, isErr := err.(bigquery.PutMultiError)
if isErr {
for _, e := range putErr {
log.Errorf("There was a problem writing Row for customer project: %v, Error: %v", task.CustomerProjectNumber, util.Pformat(e))
}
} else {
log.Errorf("There was a problem writing GCE Quota Rows for customer project: %v, Error: %v", task.CustomerProjectNumber, err)
}
}
我希望这有所帮助(也许如果不是你,在事实发生两年后,那么其他任何偶然发现这个的人(!