从Apache Beam Go SDK写入BigQuery时类型无效



当我尝试使用Apache Beam Go SDK编写BigQuery时,我得到了:

panic: element type is struct { TaskId string; Connector string; ... }, 
want struct { TaskId string "json:"task_id" bigquery:"task_id""; Connector string "json:"connector" bigquery:"connector""; ... }`

这是我的代码:

package main
import (
"context"
"encoding/json"
"flag"
"github.com/apache/beam/sdks/go/pkg/beam/io/bigqueryio"
"github.com/apache/beam/sdks/go/pkg/beam/x/debug"
"time"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
"github.com/apache/beam/sdks/go/pkg/beam/log"
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
)
type (
ProcessedTask struct {
TaskId     string    `json:"task_id" bigquery:"task_id"`
Connector  string    `json:"connector" bigquery:"connector"`
// ... other fields
}
)
func buildPipeline(s beam.Scope) {
rawProcessedTasks := pubsubio.Read(s, "project", "topic", &pubsubio.ReadOptions{Subscription: "subscription"})
processedTasks := beam.ParDo(s, func(ctx context.Context, data []byte) (ProcessedTask, error) {
var task ProcessedTask
if err := json.Unmarshal(data, &task); err != nil {
log.Error(ctx, err)
return task, err
}
return task, nil
}, rawProcessedTasks)
debug.Printf(s, "Task : %#v", processedTasks)
bigqueryio.Write(s, "project", "table", processedTasks)
}
func main() {
flag.Parse()
beam.Init()
p, s := beam.NewPipelineWithRoot()
buildPipeline(s)
ctx := context.Background()
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute pipeline: %v", err)
}
}

在查看Dataflow的日志时,我从debug.Printf:中看到

Task : struct { TaskId string "json:"task_id" bigquery:"task_id""; Connector string "json:"connector" bigquery:"connector""; ... }
{TaskId:"342ebd19-4bd8-4271-9970-fd4615ddd725", Connector:"optin-to-mailjet", ...}}

为什么我在bigqueryio.Write调用中丢失了正确的键入?

我试图注册beam类型,但没有成功。我必须在某个地方明确类型吗?

我举了一个官方的例子:https://github.com/apache/beam/blob/master/sdks/go/examples/cookbook/tornadoes/tornadoes.go

您在"init";

func init() {
beam.RegisterType(reflect.TypeOf((*ProcessedTask)(nil)).Elem())
}

最新更新