为了账号安全,请及时绑定邮箱和手机立即绑定

如何始终从 Go 频道获取最新值?

如何始终从 Go 频道获取最新值?

Go
九州编程 2023-05-15 15:23:58
我从 Go 开始,现在我正在编写一个简单的程序,它从传感器中读取数据并将其放入通道中以使用它进行一些计算。我现在让它工作如下:package mainimport (    "fmt"    "time"    "strconv")func get_sensor_data(c chan float64) {    time.Sleep(1 * time.Second)  // wait a second before sensor data starts pooring in    c <- 2.1  // Sensor data starts being generated    c <- 2.2    c <- 2.3    c <- 2.4    c <- 2.5}func main() {    s := 1.1    c := make(chan float64)    go get_sensor_data(c)    for {        select {        case s = <-c:            fmt.Println("the next value of s from the channel: " + strconv.FormatFloat(s, 'f', 1, 64))        default:            // no new values in the channel        }        fmt.Println(s)        time.Sleep(500 * time.Millisecond)  // Do heavy "work"    }}这很好用,但传感器会生成大量数据,而我总是只对最新数据感兴趣。然而,使用此设置,它只会在每个循环中读出下一个项目,这意味着如果某个点的通道包含 20 个值,则只会在 10 秒后读出最新值。有没有办法让一个通道一次总是只包含一个值,这样我总是只得到我感兴趣的数据,并且通道没有使用不必要的内存(尽管内存是我最不担心的)?
查看完整描述

5 回答

?
繁华开满天机

TA贡献1816条经验 获得超4个赞

最好将通道视为队列 (FIFO)。因此你不能真的跳过。然而,有些库可以做这样的事情:https://github.com/cloudfoundry/go-diodes是一个原子环形缓冲区,它将覆盖旧数据。如果您愿意,可以设置较小的尺寸。

综上所述,听起来您并不需要队列(或环形缓冲区)。你只需要一个互斥量:

type SensorData struct{

  mu sync.RWMutex

  last float64

}


func (d *SensorData) Store(data float64) {

 mu.Lock()

 defer mu.Unlock()


 d.last = data

}


func (d *SensorData) Get() float64 {

 mu.RLock()

 defer mu.RUnlock()


 return d.last

}

这使用 aRWMutex这意味着许多东西可以同时从中读取,而只有一个东西可以写入。它将像您所说的那样存储一个条目。


查看完整回答
反对 回复 2023-05-15
?
慕妹3242003

TA贡献1824条经验 获得超6个赞

不,通道是 FIFO 缓冲区,句号。这就是渠道的运作方式及其唯一目的。如果您只想要最新的值,请考虑只使用一个受互斥锁保护的变量;每当有新数据进入时写入它,无论何时读取它,您将始终读取最新值。



查看完整回答
反对 回复 2023-05-15
?
呼如林

TA贡献1798条经验 获得超3个赞

渠道服务于特定目的。您可能希望使用锁内的代码,并在设置新值时更新变量。

这样接收者将始终获得最新的价值。


查看完整回答
反对 回复 2023-05-15
?
拉丁的传说

TA贡献1789条经验 获得超8个赞

你不能直接从一个通道获取它,但你可以为每个值使用一个通道,并在有新值时得到通知:


package main


import (

    "fmt"

    "strconv"

    "sync"

    "time"

)


type LatestChannel struct {

    n    float64

    next chan struct{}

    mu   sync.Mutex

}


func New() *LatestChannel {

    return &LatestChannel{next: make(chan struct{})}

}


func (c *LatestChannel) Push(n float64) {

    c.mu.Lock()

    c.n = n

    old := c.next

    c.next = make(chan struct{})

    c.mu.Unlock()

    close(old)

}


func (c *LatestChannel) Get() (float64, <-chan struct{}) {

    c.mu.Lock()

    n := c.n

    next := c.next

    c.mu.Unlock()

    return n, next

}


func getSensorData(c *LatestChannel) {

    time.Sleep(1 * time.Second)

    c.Push(2.1)

    time.Sleep(100 * time.Millisecond)

    c.Push(2.2)

    time.Sleep(100 * time.Millisecond)

    c.Push(2.3)

    time.Sleep(100 * time.Millisecond)

    c.Push(2.4)

    time.Sleep(100 * time.Millisecond)

    c.Push(2.5)

}


func main() {

    s := 1.1


    c := New()

    _, hasNext := c.Get()

    go getSensorData(c)


    for {

        select {

        case <-hasNext:

            s, hasNext = c.Get()

            fmt.Println("the next value of s from the channel: " + strconv.FormatFloat(s, 'f', 1, 64))

        default:

            // no new values in the channel

        }

        fmt.Println(s)


        time.Sleep(250 * time.Millisecond) // Do heavy "work"

    }

}

查看完整回答
反对 回复 2023-05-15
?
心有法竹

TA贡献1866条经验 获得超5个赞

试试这个包https://github.com/subbuv26/chanup

它允许生产者用最新值更新通道,从而替换最新值。并且产品不会被阻止。(有了这个,过时的值被覆盖)。因此,在消费者方面,总是只读取最新的项目。

import "github.com/subbuv26/chanup"

ch := chanup.GetChan()

_ := ch.Put(testType{

    a: 10,

    s: "Sample",

})

_ := ch.Update(testType{

    a: 20,

    s: "Sample2",

})

// Continue updating with latest values

...


...

// On consumer end

val := ch.Get()

// val contains latest value


查看完整回答
反对 回复 2023-05-15
  • 5 回答
  • 0 关注
  • 143 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信