我试图将字符串字段(date
)添加到用Go编写的Beam管道上的PCollection。这个字段的值最终将由传递给我的管道的参数决定,尽管现在我硬编码了一个用于测试目的的日期。
相关代码如下:
type AddStringDoFn struct {
StringToAdd string
}
func (d *AddStringDoFn) ProcessElement(x beam.X, st string) (string, beam.X) {
return st, x
}
然后,在管道中,我是这样调用函数的(为了测试的目的硬编码一个日期):
col_with_dates := beam.ParDo(scope, &AddStringDoFn{StringToAdd: "2023-01-01"}, col)
其中col
为输入PCollection。
当我运行这段代码,我得到一个错误:
panic: inserting ParDo in scope root
creating new DoFn in scope root
binding fn reflect.methodValueCall
binding params [{Value typex.X} {Value string}] to input [main.PrivateModel]
too few inputs: forgot an input or to annotate options?
这里缺少什么参数?还是我的狗畸形了?
type AddStringDoFn struct {
StringToAdd string
}
func (d *AddStringDoFn) ProcessElement(x beam.X) (string, beam.X) {
return d.StringToAdd, x
}
然后调用函数
col_with_dates := beam.ParDo(scope, &AddStringDoFn{StringToAdd: "2023-01-01"}, col, beam.TypeDefinition{Var: beam.XType, T: reflect.TypeOf("")})