如何使用通道在 golang 管道阶段批量项目?



我正在在线阅读管道教程,并尝试构建一个像这样操作的阶段 -

  1. 传入事件分批进行批处理,每批 10 个,然后再将它们发送到外部 chan
  2. 如果我们在 5 秒内没有看到 10 个事件,请合并我们收到的尽可能多的事件并发送它们,关闭 chan 并返回。

但是,我不知道第一个选择的案例会是什么样子。尝试了多种方法,但无法克服这一点。 任何指示都非常感谢!

func BatchEvents(inChan <- chan *Event) <- chan *Event {
batchSize := 10
comboEvent := Event{}
go func() {
defer close(out)
i = 0
for event := range inChan {
select {
case -WHAT GOES HERE?-:
if i < batchSize {
comboEvent.data = append(comboEvent.data, event.data)
i++;
} else {
out <- &comboEvent
// reset for next batch
comboEvent = Event{}
i=0;
}
case <-time.After(5 * time.Second):
// process whatever we have seen so far if the batch size isn't filled in 5 secs
out <- &comboEvent
// stop after
return
}
}
}()
return out
}

您的第一个选择情况应该来自该通道,而不是在通道上做一个范围,整个事情都在无限循环中。

func BatchEvents(inChan <-chan *Event) <-chan *Event {
batchSize := 10
comboEvent := Event{}
go func() {
defer close(out)
i = 0
for {
select {
case event, ok := <-inChan:
if !ok {
return
}
comboEvent.data = append(comboEvent.data, event.data)
i++
if i == batchSize {
out <- &comboEvent
// reset for next batch
comboEvent = Event{}
i = 0
}
case <-time.After(5 * time.Second):
// process whatever we have seen so far if the batch size isn't filled in 5 secs
if i > 0 {
out <- &comboEvent
}
// stop after
return
}
}
}()
return out
}

最新更新