为了账号安全,请及时绑定邮箱和手机立即绑定

缓冲区为空后关闭“工人”程序

缓冲区为空后关闭“工人”程序

Go
30秒到达战场 2021-11-08 09:36:26
我希望我的 go 例程工作人员(ProcessToDo()在下面的代码中)等到所有“排队”的工作都处理完毕后再关闭。工作程序有一个“待办事项”通道(缓冲),工作通过该通道发送给它。它有一个“完成”通道告诉它开始关闭。文档说,如果满足多个选择,通道上的选择将选择一个“伪随机值”……这意味着在所有缓冲工作完成之前触发关闭(返回)。在下面的代码示例中,我希望打印所有 20 条消息...package mainimport (    "time"    "fmt")func ProcessToDo(done chan struct{}, todo chan string) {    for {        select {        case work, ok := <-todo:            if !ok {                fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")                return            }            fmt.Printf("todo: %q\n", work)            time.Sleep(100 * time.Millisecond)        case _, ok := <-done:            if ok {                fmt.Printf("Shutting down ProcessToDo - done message received!\n")            } else {                fmt.Printf("Shutting down ProcessToDo - done channel closed!\n")            }            close(todo)            return        }    }}func main() {    done := make(chan struct{})    todo := make(chan string, 100)    go ProcessToDo(done, todo)    for i := 0; i < 20; i++ {        todo <- fmt.Sprintf("Message %02d", i)    }    fmt.Println("*** all messages queued ***")    time.Sleep(1 * time.Second)    close(done)    time.Sleep(4 * time.Second)}
查看完整描述

3 回答

?
Smart猫小萌

TA贡献1911条经验 获得超7个赞

done在您的情况下,频道是完全没有必要的,因为您可以通过关闭todo频道本身来发出关闭信号。


并for range在通道上使用,它将迭代直到通道关闭且其缓冲区为空。


你应该有一个done通道,但只是为了让 goroutine 本身可以发出它完成工作的信号,以便主 goroutine 可以继续或退出。


这个变体与你的等价,更简单,不需要time.Sleep()调用等待其他 goroutines(这将太错误和不确定)。在Go Playground上试试:


func ProcessToDo(done chan struct{}, todo chan string) {

    for work := range todo {

        fmt.Printf("todo: %q\n", work)

        time.Sleep(100 * time.Millisecond)

    }

    fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")

    done <- struct{}{} // Signal that we processed all jobs

}


func main() {

    done := make(chan struct{})

    todo := make(chan string, 100)


    go ProcessToDo(done, todo)


    for i := 0; i < 20; i++ {

        todo <- fmt.Sprintf("Message %02d", i)

    }


    fmt.Println("*** all messages queued ***")

    close(todo)

    <-done // Wait until the other goroutine finishes all jobs

}

还要注意,worker goroutines 应该使用信号完成,defer这样如果主 goroutine 以某种意外的方式返回或恐慌,它就不会卡在等待 worker 中。所以它应该像这样开始:


defer func() {

    done <- struct{}{} // Signal that we processed all jobs

}()

您还可以使用sync.WaitGroup将主 goroutine 同步到工作程序(以等待它)。事实上,如果你打算使用多个工作 goroutines,那比从done通道读取多个值更干净。此外,发出完成信号更简单,WaitGroup因为它有一个Done()方法(这是一个函数调用),因此您不需要匿名函数:


defer wg.Done()

有关完整示例,请参阅JimB 的 anwserWaitGroup。


使用for range信道同步,因此你不需要任何额外的代码,将同步访问:如果你想使用多工作够程也是地道的todo通道或在收到该职位。如果您关闭 中的todo通道main(),这将正确地向所有工作程序 goroutine 发出信号。但当然,所有排队的作业都将被接收和处理一次。


现在采用WaitGroup 用于使主 goroutine 等待 worker的变体(JimB 的回答):如果您想要 1 个以上的 worker goroutine 怎么办;并发(并且很可能是并行)处理您的工作?


您唯一需要在代码中添加/更改的是:真正启动其中的多个:


for i := 0; i < 10; i++ {

    wg.Add(1)

    go ProcessToDo(todo)

}

无需更改任何其他内容,您现在拥有一个正确的并发应用程序,它使用 10 个并发 goroutines 接收和处理您的作业。并且我们没有使用任何“丑陋” time.Sleep()(我们使用了一个但只是为了模拟慢速处理,而不是等待其他 goroutine),并且您不需要任何额外的同步。


查看完整回答
反对 回复 2021-11-08
?
慕莱坞森

TA贡献1810条经验 获得超4个赞

让通道的使用者关闭它通常是一个坏主意,因为在关闭的通道上发送是一种恐慌。


在这种情况下,如果您不想在所有消息发送之前中断消费者,只需使用for...range循环并在完成后关闭通道。您还需要一个类似 a 的信号WaitGroup来等待 goroutine 完成(而不是使用 time.Sleep)


http://play.golang.org/p/r97vRPsxEb


var wg sync.WaitGroup


func ProcessToDo(todo chan string) {

    defer wg.Done()

    for work := range todo {

        fmt.Printf("todo: %q\n", work)

        time.Sleep(100 * time.Millisecond)


    }

    fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")


}


func main() {

    todo := make(chan string, 100)

    wg.Add(1)

    go ProcessToDo(todo)


    for i := 0; i < 20; i++ {

        todo <- fmt.Sprintf("Message %02d", i)

    }


    fmt.Println("*** all messages queued ***")

    close(todo)

    wg.Wait()

}


查看完整回答
反对 回复 2021-11-08
?
拉莫斯之舞

TA贡献1820条经验 获得超10个赞

我认为接受的答案对于这个特定的例子非常有效。然而,要回答“缓冲区为空后关闭“工人”程序”的问题 - 一个更优雅的解决方案是可能的。


worker 可以在缓冲区为空时返回,而无需通过关闭通道来发出信号。


如果工作人员需要处理的任务数量未知,这将特别有用。


在这里查看:https : //play.golang.org/p/LZ1y0eIRMeS


package main


import (

    "fmt"

    "time"

    "math/rand"

)


func main() {

    rand.Seed(time.Now().UnixNano())

    ch := make(chan interface{}, 10)


    go worker(ch)

    for i := 1; i <= rand.Intn(9) + 1; i++ {

            ch <- i

    }


    blocker := make(chan interface{})

    <-blocker

}


func worker(ch chan interface{}){   

    for {

        select {

        case msg := <- ch:

            fmt.Println("msg: ", msg)

        default:

            fmt.Println("exiting worker")

            return

        }

    }       

}


查看完整回答
反对 回复 2021-11-08
  • 3 回答
  • 0 关注
  • 166 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信