如何始终从Go频道获取最新价值



我从Go开始,现在我正在编写一个简单的程序,从传感器中读取数据,并将其放入通道中进行一些计算

package main
import (
"fmt"
"time"
"strconv"
)
func get_sensor_data(c chan float64) {
time.Sleep(1 * time.Second)  // wait a second before sensor data starts pooring in
c <- 2.1  // Sensor data starts being generated
c <- 2.2
c <- 2.3
c <- 2.4
c <- 2.5
}
func main() {
s := 1.1
c := make(chan float64)
go get_sensor_data(c)
for {
select {
case s = <-c:
fmt.Println("the next value of s from the channel: " + strconv.FormatFloat(s, 'f', 1, 64))
default:
// no new values in the channel
}
fmt.Println(s)
time.Sleep(500 * time.Millisecond)  // Do heavy "work"
}
}

这很好,但传感器会生成大量数据,我总是只对最新的数据感兴趣。然而,使用这种设置,它只会在每个循环中读取下一个项目,这意味着如果通道在某个点包含20个值,则最新的值只会在10秒后读取。

有没有办法让一个通道一次只包含一个值,这样我总是只得到我感兴趣的数据,并且通道不会使用不必要的内存(尽管内存是我最不担心的(?

通道最好被认为是队列(FIFO(。因此,你不能真的到处乱跑。然而,也有一些图书馆这样做:https://github.com/cloudfoundry/go-diodes是一个将覆盖旧数据的原子环形缓冲区。如果你愿意,你可以设置一个较小的尺寸。

尽管如此,听起来你并不需要一个队列(或环形缓冲区(。你只需要一个互斥:

type SensorData struct{
mu sync.RWMutex
last float64
}
func (d *SensorData) Store(data float64) {
mu.Lock()
defer mu.Unlock()
d.last = data
}
func (d *SensorData) Get() float64 {
mu.RLock()
defer mu.RUnlock()
return d.last
}

这使用了RWMutex,这意味着许多东西可以同时从中读取,而只有一个东西可以写入。它会像你说的那样存储一个条目。

否。通道是FIFO缓冲器,完全停止。这就是渠道的运作方式和唯一目的。如果您只想要最新的值,可以考虑只使用一个由互斥对象保护的变量;每当有新数据出现时,都要对其进行写入,每当您读取它时,您将始终读取最新的值。

频道有特定用途。您可能希望使用锁内的代码,并在设置新值时更新变量。

这样,累加器将始终获得最新的值。

您不能直接从一个通道获得,但每个值可以使用一个通道,并在有新值时得到通知:

package main
import (
"fmt"
"strconv"
"sync"
"time"
)
type LatestChannel struct {
n    float64
next chan struct{}
mu   sync.Mutex
}
func New() *LatestChannel {
return &LatestChannel{next: make(chan struct{})}
}
func (c *LatestChannel) Push(n float64) {
c.mu.Lock()
c.n = n
old := c.next
c.next = make(chan struct{})
c.mu.Unlock()
close(old)
}
func (c *LatestChannel) Get() (float64, <-chan struct{}) {
c.mu.Lock()
n := c.n
next := c.next
c.mu.Unlock()
return n, next
}
func getSensorData(c *LatestChannel) {
time.Sleep(1 * time.Second)
c.Push(2.1)
time.Sleep(100 * time.Millisecond)
c.Push(2.2)
time.Sleep(100 * time.Millisecond)
c.Push(2.3)
time.Sleep(100 * time.Millisecond)
c.Push(2.4)
time.Sleep(100 * time.Millisecond)
c.Push(2.5)
}
func main() {
s := 1.1
c := New()
_, hasNext := c.Get()
go getSensorData(c)
for {
select {
case <-hasNext:
s, hasNext = c.Get()
fmt.Println("the next value of s from the channel: " + strconv.FormatFloat(s, 'f', 1, 64))
default:
// no new values in the channel
}
fmt.Println(s)
time.Sleep(250 * time.Millisecond) // Do heavy "work"
}
}

如果您不需要关于新值的通知,您可以尝试在Golang中读取通道内部的通道模式。

试试这个包https://github.com/subbuv26/chanup

它允许生产者使用最新值更新频道,该值将替换最新值。并且生产不会被阻塞。(这样,过时的值就会被覆盖(。因此,在消费者方面,总是只有最新的商品才会被阅读。

import "github.com/subbuv26/chanup"
ch := chanup.GetChan()
_ := ch.Put(testType{
a: 10,
s: "Sample",
})
_ := ch.Update(testType{
a: 20,
s: "Sample2",
})
// Continue updating with latest values
...
...
// On consumer end
val := ch.Get()
// val contains latest value

还有另一种方法可以解决这个问题(技巧(

发件人工作速度更快:如果channel_length>1

go func() {
for {
msg:=strconv.Itoa(int(time.Now().Unix()))
fmt.Println("make: ",msg," at:",time.Now())
messages <- msg
if len(messages)>1{
//remove old message
<-messages
}
time.Sleep(2*time.Second)
}
}()

接收器工作较慢:

go func() {
for {
channLen :=len(messages)
fmt.Println("len is ",channLen)
fmt.Println("received",<-messages)
time.Sleep(10*time.Second)
}
}()

或者,我们可以从接收方删除旧消息(阅读信息如删除(

有一个优雅的仅通道解决方案。如果你可以再添加一个通道和goroutine,你可以引入一个无buferless通道和一个goroutine来尝试将通道中的最新值发送给它:

package main
import (
"fmt"
"time"
)
func wrapLatest(ch <-chan int) <-chan int {
result := make(chan int) // important that this one i unbuffered
go func() {
defer close(result)
value, ok := <-ch
if !ok {
return
}
LOOP:
for {
select {
case value, ok = <-ch:
if !ok {
return
}
default:
break LOOP
}
}
for {
select {
case value, ok = <-ch:
if !ok {
return
}
case result <- value:
if value, ok = <-ch; !ok {
return
}
}
}
}()
return result
}
func main() {
sendChan := make(chan int, 10) // may be buffered or not
for i := 0; i < 10; i++ {
sendChan <- i
}
go func() {
for i := 10; i < 20; i++ {
sendChan <- i
time.Sleep(time.Second)
}
close(sendChan)
}()
recvChan := wrapLatest(sendChan)
for i := range recvChan {
fmt.Println(i)
time.Sleep(time.Second * 2)
}
}

最新更新