Go SDK Apache Beam:单例端输入单例,用于int定义不明确



使用Go SDK for Apache Beam,我尝试使用侧输入创建 PCollection 的视图。

但是我遇到了这个奇怪的错误:

Failed to execute job: on ctx=      making side input 0:
singleton side input Singleton for int ill-defined
exit status 1

这是我使用的代码:

// A PCollection of key/value pairs
pairedWithOne := beam.ParDo(s, func(r models.Review) (string, int) {
return r.DoRecommend, 1
}, col)
// A PCollection of ints (demo)
pcollInts := beam.CreateList(s, [3]int{
1, 2, 3,
})
// A PCollection of key/values pairs
summed := stats.SumPerKey(s, pairedWithOne)
// Here is where I'd like to use my side input.
mapped := beam.ParDo(s, func(k string, v int, side int, emit func(ratio 
models.RecommendRatio)) {
var ratio = models.RecommendRatio{
DoRecommend: k,
NumVotes:    v,
}
emit(ratio)
}, summed, beam.SideInput{Input: pcollInts})

我在 git 上找到了这个例子:

// Side Inputs
//
// While a ParDo processes elements from a single "main input" PCollection, it
// can take additional "side input" PCollections. These SideInput along with
// the DoFn parameter form express styles of accessing PCollection computed by
// earlier pipeline operations, passed in to the ParDo transform using SideInput
// options, and their contents accessible to each of the DoFn operations. For
// example:
//
//     words := ...
//     cufoff := ...  // Singleton PCollection<int>
//     smallWords := beam.ParDo(s, func (word string, cutoff int, emit func(string)) {
//           if len(word) < cutoff {
//                emit(word)
//           }
//     }, words, beam.SideInput{Input: cutoff})

更新:似乎Impulse(scope)函数在这里起作用,但我无法弄清楚是什么。从 GoDoc :

Impulse emits a single empty []byte into the global window. The resulting PCollection is a singleton of type []byte.
The purpose of Impulse is to trigger another transform, such as ones that take all information as side inputs.

如果这有帮助,这里是我的结构:

type Review struct {
Date        time.Time `csv:"date" json:"date"`
DoRecommend string    `csv:"doRecommend" json:"doRecommend"`
NumHelpful  int       `csv:"numHelpful" json:"numHelpful"`
Rating      int       `csv:"rating" json:"rating"`
Text        string    `csv:"text" json:"text"`
Title       string    `csv:"title" json:"title"`
Username    string    `csv:"username" json:"username"`
}
type RecommendRatio struct {
DoRecommend string `json:"doRecommend"`
NumVotes    int    `json:"numVotes"`
}

有什么解决方案吗?

谢谢

更新:

这可以通过删除beam.Impulse()函数来简化(我认为错误的类型导致了这里的麻烦(:

mapped := beam.ParDo(s,
func(k string, v int,
sideCounted int,
emit func(ratio models.RecommendRatio)) {
p := percent.PercentOf(v, sideCounted)
emit(models.RecommendRatio{
DoRecommend: k,
NumVotes:    v,
Percent:     p,
})
}, summed,
beam.SideInput{Input: counted})

老: 似乎我已经找到了一个解决方案,也许只是一个解决方法,正在寻找快速审查并开放改进空间。(我相信该函数不是幂等的,因为如果它可以在多个节点工作线程上执行多次,append(( 函数将复制条目......

但这里的全局想法是使用beam.Impulse(scope)函数对[]uint8 byte进行单例 PCollecation,并将所有"真实"数据作为侧输入传递。

// Pair each recommendation value with one -> PColl<KV<string, int>>
pairedWithOne := beam.ParDo(s, func(r models.Review) (string, int) {
return r.DoRecommend, 1
}, col)
// Sum num occurrences of a recommendation k/v pair
summed := stats.SumPerKey(s, pairedWithOne)
// Drop keys for latter global count
droppedKey := beam.DropKey(s, pairedWithOne)
// Count globally the number of recommendation values -> PColl<int>
counted := stats.Sum(s, droppedKey)
// Map to a struct with percentage per ratio
mapped := beam.ParDo(s,
func(_ []uint8,
sideSummed func(k *string, v *int) bool,
sideCounted int,
emit func(ratio []models.RecommendRatio)) {
var k string
var v int
var ratios []models.RecommendRatio

for sideSummed(&k, &v) {
p := percent.PercentOf(v, sideCounted)
ratio := models.RecommendRatio{
DoRecommend: k,
NumVotes:    v,
Percent:     p,
}
ratios = append(ratios, ratio)
}
emit(ratios)
}, beam.Impulse(s),
beam.SideInput{Input: summed},
beam.SideInput{Input: counted})