为了账号安全,请及时绑定邮箱和手机立即绑定

关闭缓冲通道时是否应该将其排空

关闭缓冲通道时是否应该将其排空

Go
狐的传说 2021-12-20 15:20:50
给定 Go 中(部分)填充的缓冲通道ch := make(chan *MassiveStruct, n)for i := 0; i < n; i++ {    ch <- NewMassiveStruct()}是否建议在关闭通道时(由作者)也排空通道,以防读者何时从中读取(例如,数量有限并且他们目前正忙)?那是close(ch)for range ch {}如果通道上还有其他并发阅读器,这样的循环是否保证结束?上下文:具有固定数量工作器的队列服务,当服务关闭时,它应该停止处理任何排队的东西(但不一定在之后被 GC 处理)。所以我要向工作人员表明服务正在终止。我可以立即排空剩余的“队列”,让 GC 释放分配的资源,我可以读取和忽略工作程序中的值,我可以离开通道,因为读取器正在运行,并将通道设置为 nil 在写入器中GC 清理一切。我不确定哪种方式最干净。
查看完整描述

3 回答

?
FFIVE

TA贡献1797条经验 获得超6个赞

这取决于您的程序,但一般来说我倾向于说不(您不需要在关闭频道之前清除频道):如果您关闭频道时频道中有项目,任何仍在从频道阅读的读者都会接收项目,直到通道为空。


下面是一个例子:


package main


import (

    "sync"

    "time"

)


func main() {


    var ch = make(chan int, 5)

    var wg sync.WaitGroup

    wg.Add(1)


    for range make([]struct{}, 2) {

        go func() {

            for i := range ch {

                wg.Wait()

                println(i)

            }

        }()

    }


    for i := 0; i < 5; i++ {

        ch <- i

    }

    close(ch)


    wg.Done()

    time.Sleep(1 * time.Second)

}

在这里,程序将输出所有项目,尽管通道在任何读者甚至可以从通道读取之前严格关闭。


查看完整回答
反对 回复 2021-12-20
?
噜噜哒

TA贡献1784条经验 获得超7个赞

有更好的方法来实现您想要实现的目标。您当前的方法只会导致丢弃一些记录,并随机处理其他记录(因为排水循环正在与所有消费者竞争)。这并没有真正解决目标。


你想要的是取消。这是Go 并发模式中的一个示例:管道和取消


func sq(done <-chan struct{}, in <-chan int) <-chan int {

    out := make(chan int)

    go func() {

        defer close(out)

        for n := range in {

            select {

            case out <- n * n:

            case <-done:

                return

            }

        }

    }()

    return out

}

您将一个done通道传递给所有 goroutine,并在您希望它们都停止处理时关闭它。如果你经常这样做,你可能会发现这个golang.org/x/net/context包很有用,它正式化了这个模式,并添加了一些额外的功能(比如超时)。



查看完整回答
反对 回复 2021-12-20
?
动漫人物

TA贡献1815条经验 获得超10个赞

我觉得所提供的答案实际上并没有澄清,除了不需要排水也不需要关闭的提示。因此,针对所描述的上下文的以下解决方案对我来说看起来很干净,它终止了工作人员并删除了对他们或相关频道的所有引用,因此,让 GC 清理频道及其内容:


type worker struct {

    submitted chan Task

    stop      chan bool

    p         *Processor

}


// executed in a goroutine

func (w *worker) run() {

    for {

        select {

        case task := <-w.submitted:

            if err := task.Execute(w.p); err != nil {

                logger.Error(err.Error())

            }

        case <-w.stop:

            logger.Warn("Worker stopped")

            return

        }

    }

}


func (p *Processor) Stop() {

    if atomic.CompareAndSwapInt32(&p.status, running, stopped) {

        for _, w := range p.workers {

            w.stop <- true

        }

        // GC all workers as soon as goroutines stop

        p.workers = nil

        // GC all published data when workers terminate

        p.submitted = nil

        // no need to do the following above:

        // close(p.submitted)

        // for range p.submitted {}

    }

}


查看完整回答
反对 回复 2021-12-20
  • 3 回答
  • 0 关注
  • 142 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号