如何等待频道活动的间歇触发某些内容



我有一个通道,可以接收对它的突发写入。我想等到通道上的突发发送完成,然后再触发操作。

我已经看过这个要点,但是,如果缓冲区中有数据,它将每interval发送一次输出:

func debounceChannel(interval time.Duration, output chan int) chan int {
  input := make(chan int)
  go func() {
    var buffer int
    var ok bool
    // We do not start waiting for interval until called at least once
    buffer, ok = <-input 
    // If channel closed exit, we could also close output
    if !ok {
      return
    }
    // We start waiting for an interval
    for {
      select {
      case buffer, ok = <-input:
        // If channel closed exit, we could also close output
        if !ok {
          return
        }
      case <-time.After(interval):
        // Interval has passed and we have data, so send it
        output <- buffer
        // Wait for data again before starting waiting for an interval
        buffer, ok = <-input
        if !ok {
          return
        }
        // If channel is not closed we have more data and start waiting for interval
      }
    }
  }()
  return input
}

就我而言,我想等到输入通道上不再发送此突发的任何数据,然后再触发或发送输出。

我如何实现这一点?

听起来你需要在goroutines之间进行同步,也许沿着这条线。

func main() {
        // Create a channel for our input
        input := make(chan int, 1)
        // Create another for synchronization between main and forked goroutines
        done := make(chan bool)
        go func() {
                // block-wait for received value
                <-input
                // do some more things here
                // when done, send signal to the main goroutine
                done <- true
        }()
        // Do something while wait for the forked goroutine
        // this block until `<-done`
        <-done
        close(mychan)
}

这篇文章清楚地解释了使用频道和同步组进行同步。

这是我最终作为我的去保镖实现的:

func Debounce(lull time.Duration, in chan struct{}, out chan struct{}) {
    go func() {
        var last int64 = 0
        for {
            select {
            case <-in:
                last = time.Now().Unix()
            case <-time.Tick(lull):
                if last != 0 && time.Now().Unix() >= last+int64(lull.Seconds()) {
                    last = 0
                    out <- struct{}{}
                }
            }
        }
    }()
}

它需要一个间歇期,即如果我们没有收到输入的持续时间,那么我们假设数据突发中断。有2个通道,1个输入和1个输出。数据突发到达输入,对于每个突发,我们在突发结束时写入输出通道。

实现非常简单。我只是在每次从输入通道接收时存储当前的 unix 时间戳。然后,我有一个滴答作响的股票代码,其中包含平静时间的持续时间。所有这些功能都是检查我们是否超过了上次突发的等待时间。如果是这样,它将last重置为 0,并在输出通道上发出事件。

下面是一些使用去抖动函数的代码,其间歇时间为 2 秒,在输入通道上发送随机突发:

func main() {
    out := make(chan struct{})
    in := make(chan struct{})
    Debounce(2*time.Second, in, out)
    // Generating bursts of input data
    go func(in chan struct{}) {
        for {
            select {
            case <-time.Tick(1 * time.Second):
                in <- struct{}{}
                fmt.Println("Sending!")
                shouldSleep := rand.Intn(2)
                if shouldSleep == 1 {
                    time.Sleep(5 * time.Second)
                }
            }
        }
    }(in)
    // Listening for output events
    go func(out chan struct{}) {
        for _ = range out {
            fmt.Println("Got an event!")
        }
    }(out)
    // Do not let main terminate.
    done := make(chan struct{})
    <-done
}

我用作去保镖的内容:

package pkg
import (
    "context"
    "time"
)
// Debounce takes a channel, and will notify the output channel with the last received message after a lull duration.
// Upon cancel, it will check one last time for a message.
// Cancelling the ctx will cause the goroutine to exit.
func Debounce[T any](ctx context.Context, lull time.Duration, input <-chan T, output chan<- T) {
    
    go func() {
        var (
            buffer   *T
            minTimer = time.NewTimer(min)
            flush    = func() {
                if buffer != nil {
                    // don't block if we couldn't send
                    select {
                    case output <- *buffer:
                    default:
                    }
                    buffer = nil
                }
            }
        )
        defer minTimer.Stop()
        hits := 0
        for {
            select {
            case <-ctx.Done():
                // try and get last message
                select {
                case tmpBuf, ok := <-input:
                    if !ok {
                        break
                    }
                    buffer = &tmpBuf
                default:
                }
                flush()
                return
            case tmpBuf, ok := <-input:
                if !ok {
                    flush()
                    return
                }
                hits++
                buffer = &tmpBuf
            case <-minTimer.C:
                flush()
                minTimer.Reset(min)
            }
        }
    }()
}

最新更新