在golang中使用BigQuery存储写入API时,BigQuery可以为null类型



我将从传统的流式API切换到存储写入API,下面是golang中的示例:https://github.com/alexflint/bigquery-storage-api-example

在旧代码中,我使用bigquery的null类型来指示字段可以为null:

type Person struct {
Name bigquery.NullString `bigquery:"name"`
Age  bigquery.NullInt64  `bigquery:"age"`
}
var persons = []Person{
{
Name: ToBigqueryNullableString(""), // this will be null in bigquery
Age:  ToBigqueryNullableInt64("20"),
},
{
Name: ToBigqueryNullableString("David"),
Age:  ToBigqueryNullableInt64("60"),
},
}
func main() {
ctx := context.Background()
bigqueryClient, _ := bigquery.NewClient(ctx, "project-id")

inserter := bigqueryClient.Dataset("dataset-id").Table("table-id").Inserter()
err := inserter.Put(ctx, persons)
if err != nil {
log.Fatal(err)
}
}
func ToBigqueryNullableString(x string) bigquery.NullString {
if x == "" {
return bigquery.NullString{Valid: false}
}
return bigquery.NullString{StringVal: x, Valid: true}
}
func ToBigqueryNullableInt64(x string) bigquery.NullInt64 {
if x == "" {
return bigquery.NullInt64{Valid: false}
}
if s, err := strconv.ParseInt(x, 10, 64); err == nil {
return bigquery.NullInt64{Int64: s, Valid: true}
}
return bigquery.NullInt64{Valid: false}
}

切换到新的API后:

var persons = []*personpb.Row{
{
Name: "",
Age: 20,
},
{
Name: "David",
Age: 60,
},
}
func main() {
ctx := context.Background()
client, _ := storage.NewBigQueryWriteClient(ctx)
defer client.Close()
stream, err := client.AppendRows(ctx)
if err != nil {
log.Fatal("AppendRows: ", err)
}
var row personpb.Row
descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
if err != nil {
log.Fatal("NormalizeDescriptor: ", err)
}
var opts proto.MarshalOptions
var data [][]byte
for _, row := range persons {
buf, err := opts.Marshal(row)
if err != nil {
log.Fatal("protobuf.Marshal: ", err)
}
data = append(data, buf)
}
err = stream.Send(&storagepb.AppendRowsRequest{
WriteStream: fmt.Sprintf("projects/%s/datasets/%s/tables/%s/streams/_default", "project-id", "dataset-id", "table-id"),
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
WriterSchema: &storagepb.ProtoSchema{
ProtoDescriptor: descriptor,
},
Rows: &storagepb.ProtoRows{
SerializedRows: data,
},
},
},
})
if err != nil {
log.Fatal("AppendRows.Send: ", err)
}
_, err = stream.Recv()
if err != nil {
log.Fatal("AppendRows.Recv: ", err)
}
}

使用新的API,我需要在.proto文件中定义类型,所以我需要使用其他东西来定义可以为null的字段,我尝试使用可选字段:

syntax = "proto3";
package person;
option go_package = "/personpb";
message Row {
optional string name = 1;
int64 age = 2;
}

但在尝试流式传输时(不是在编译时(,它会给我带来错误:BqMessage.proto: person_Row.Name: The [proto3_optional=true] option may only be set on proto3fields, not person_Row.Name

我尝试过的另一种选择是使用oneof,并像这样编写的原型文件

syntax = "proto3";
import "google/protobuf/struct.proto";
package person;
option go_package = "/personpb";
message Row {
NullableString name = 1;
int64 age = 2;
}
message NullableString {
oneof kind {
google.protobuf.NullValue null = 1;
string data = 2;
}
}

然后这样使用:

var persons = []*personpb.Row{
{
Name: &personpb.NullableString{Kind: &personpb.NullableString_Null{
Null: structpb.NullValue_NULL_VALUE,
}},
Age: 20,
},
{
Name: &personpb.NullableString{Kind: &personpb.NullableString_Data{
Data: "David",
}},
Age: 60,
},
}
...

但这给了我以下错误:Invalid proto schema: BqMessage.proto: person_Row.person_NullableString.null: FieldDescriptorProto.oneof_index 0 is out of range for type "person_NullableString".

我想,因为api不知道如何处理其中一种类型,所以我需要以某种方式告诉它这一点。

在使用新存储API时,如何使用类似bigquery.Nullable类型的内容?如有任何帮助,将不胜感激

查看此示例,以获取在go中使用proto2语法文件的端到端示例。

当使用存储API时,proto3仍然是一个有点特殊的野兽,原因有两个:

  • Storage API的当前行为是使用proto2语义进行操作
  • 目前,Storage API不理解包装器类型,这是proto3用来传递可选存在的原始方式(例如,BigQuery字段中的NULL(。正因为如此,它倾向于将包装器字段视为具有值字段的子消息(在BigQuery中,是具有单个叶字段的STRUCT(
  • 在其发展的后期,proto3重新引入了optional关键字作为标记存在的一种方式,但在内部表示中,这意味着添加另一个存在标记(您在后端错误中观察到的proto3_optional警告的来源(

看起来你使用了一些新的贴面,尤其是adapt.NormalizeDescriptor()。我怀疑,如果您使用的是这个模块,那么您可能使用的是该模块的旧版本,因为规范化代码在本PR中进行了更新,并作为bigquery/v1.33.0的一部分发布。

有一些工作可以改善存储API的体验,使整体体验更流畅,但仍有一些工作要做。

最新更新