我有一个通道,可以接收对它的突发写入。我想等到通道上的突发发送完成,然后再触发操作。
我已经看过这个要点,但是,如果缓冲区中有数据,它将每interval
发送一次输出:
func debounceChannel(interval time.Duration, output chan int) chan int {
input := make(chan int)
go func() {
var buffer int
var ok bool
// We do not start waiting for interval until called at least once
buffer, ok = <-input
// If channel closed exit, we could also close output
if !ok {
return
}
// We start waiting for an interval
for {
select {
case buffer, ok = <-input:
// If channel closed exit, we could also close output
if !ok {
return
}
case <-time.After(interval):
// Interval has passed and we have data, so send it
output <- buffer
// Wait for data again before starting waiting for an interval
buffer, ok = <-input
if !ok {
return
}
// If channel is not closed we have more data and start waiting for interval
}
}
}()
return input
}
就我而言,我想等到输入通道上不再发送此突发的任何数据,然后再触发或发送输出。
我如何实现这一点?
听起来你需要在goroutines之间进行同步,也许沿着这条线。
func main() {
// Create a channel for our input
input := make(chan int, 1)
// Create another for synchronization between main and forked goroutines
done := make(chan bool)
go func() {
// block-wait for received value
<-input
// do some more things here
// when done, send signal to the main goroutine
done <- true
}()
// Do something while wait for the forked goroutine
// this block until `<-done`
<-done
close(mychan)
}
这篇文章清楚地解释了使用频道和同步组进行同步。
这是我最终作为我的去保镖实现的:
func Debounce(lull time.Duration, in chan struct{}, out chan struct{}) {
go func() {
var last int64 = 0
for {
select {
case <-in:
last = time.Now().Unix()
case <-time.Tick(lull):
if last != 0 && time.Now().Unix() >= last+int64(lull.Seconds()) {
last = 0
out <- struct{}{}
}
}
}
}()
}
它需要一个间歇期,即如果我们没有收到输入的持续时间,那么我们假设数据突发中断。有2个通道,1个输入和1个输出。数据突发到达输入,对于每个突发,我们在突发结束时写入输出通道。
实现非常简单。我只是在每次从输入通道接收时存储当前的 unix 时间戳。然后,我有一个滴答作响的股票代码,其中包含平静时间的持续时间。所有这些功能都是检查我们是否超过了上次突发的等待时间。如果是这样,它将last
重置为 0,并在输出通道上发出事件。
下面是一些使用去抖动函数的代码,其间歇时间为 2 秒,在输入通道上发送随机突发:
func main() {
out := make(chan struct{})
in := make(chan struct{})
Debounce(2*time.Second, in, out)
// Generating bursts of input data
go func(in chan struct{}) {
for {
select {
case <-time.Tick(1 * time.Second):
in <- struct{}{}
fmt.Println("Sending!")
shouldSleep := rand.Intn(2)
if shouldSleep == 1 {
time.Sleep(5 * time.Second)
}
}
}
}(in)
// Listening for output events
go func(out chan struct{}) {
for _ = range out {
fmt.Println("Got an event!")
}
}(out)
// Do not let main terminate.
done := make(chan struct{})
<-done
}
我用作去保镖的内容:
package pkg
import (
"context"
"time"
)
// Debounce takes a channel, and will notify the output channel with the last received message after a lull duration.
// Upon cancel, it will check one last time for a message.
// Cancelling the ctx will cause the goroutine to exit.
func Debounce[T any](ctx context.Context, lull time.Duration, input <-chan T, output chan<- T) {
go func() {
var (
buffer *T
minTimer = time.NewTimer(min)
flush = func() {
if buffer != nil {
// don't block if we couldn't send
select {
case output <- *buffer:
default:
}
buffer = nil
}
}
)
defer minTimer.Stop()
hits := 0
for {
select {
case <-ctx.Done():
// try and get last message
select {
case tmpBuf, ok := <-input:
if !ok {
break
}
buffer = &tmpBuf
default:
}
flush()
return
case tmpBuf, ok := <-input:
if !ok {
flush()
return
}
hits++
buffer = &tmpBuf
case <-minTimer.C:
flush()
minTimer.Reset(min)
}
}
}()
}