3 回答
TA贡献1911条经验 获得超7个赞
done在您的情况下,频道是完全没有必要的,因为您可以通过关闭todo频道本身来发出关闭信号。
并for range在通道上使用,它将迭代直到通道关闭且其缓冲区为空。
你应该有一个done通道,但只是为了让 goroutine 本身可以发出它完成工作的信号,以便主 goroutine 可以继续或退出。
这个变体与你的等价,更简单,不需要time.Sleep()调用等待其他 goroutines(这将太错误和不确定)。在Go Playground上试试:
func ProcessToDo(done chan struct{}, todo chan string) {
for work := range todo {
fmt.Printf("todo: %q\n", work)
time.Sleep(100 * time.Millisecond)
}
fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
done <- struct{}{} // Signal that we processed all jobs
}
func main() {
done := make(chan struct{})
todo := make(chan string, 100)
go ProcessToDo(done, todo)
for i := 0; i < 20; i++ {
todo <- fmt.Sprintf("Message %02d", i)
}
fmt.Println("*** all messages queued ***")
close(todo)
<-done // Wait until the other goroutine finishes all jobs
}
还要注意,worker goroutines 应该使用信号完成,defer这样如果主 goroutine 以某种意外的方式返回或恐慌,它就不会卡在等待 worker 中。所以它应该像这样开始:
defer func() {
done <- struct{}{} // Signal that we processed all jobs
}()
您还可以使用sync.WaitGroup将主 goroutine 同步到工作程序(以等待它)。事实上,如果你打算使用多个工作 goroutines,那比从done通道读取多个值更干净。此外,发出完成信号更简单,WaitGroup因为它有一个Done()方法(这是一个函数调用),因此您不需要匿名函数:
defer wg.Done()
有关完整示例,请参阅JimB 的 anwserWaitGroup。
使用for range信道同步,因此你不需要任何额外的代码,将同步访问:如果你想使用多工作够程也是地道的todo通道或在收到该职位。如果您关闭 中的todo通道main(),这将正确地向所有工作程序 goroutine 发出信号。但当然,所有排队的作业都将被接收和处理一次。
现在采用WaitGroup 用于使主 goroutine 等待 worker的变体(JimB 的回答):如果您想要 1 个以上的 worker goroutine 怎么办;并发(并且很可能是并行)处理您的工作?
您唯一需要在代码中添加/更改的是:真正启动其中的多个:
for i := 0; i < 10; i++ {
wg.Add(1)
go ProcessToDo(todo)
}
无需更改任何其他内容,您现在拥有一个正确的并发应用程序,它使用 10 个并发 goroutines 接收和处理您的作业。并且我们没有使用任何“丑陋” time.Sleep()(我们使用了一个但只是为了模拟慢速处理,而不是等待其他 goroutine),并且您不需要任何额外的同步。
TA贡献1810条经验 获得超4个赞
让通道的使用者关闭它通常是一个坏主意,因为在关闭的通道上发送是一种恐慌。
在这种情况下,如果您不想在所有消息发送之前中断消费者,只需使用for...range循环并在完成后关闭通道。您还需要一个类似 a 的信号WaitGroup来等待 goroutine 完成(而不是使用 time.Sleep)
http://play.golang.org/p/r97vRPsxEb
var wg sync.WaitGroup
func ProcessToDo(todo chan string) {
defer wg.Done()
for work := range todo {
fmt.Printf("todo: %q\n", work)
time.Sleep(100 * time.Millisecond)
}
fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
}
func main() {
todo := make(chan string, 100)
wg.Add(1)
go ProcessToDo(todo)
for i := 0; i < 20; i++ {
todo <- fmt.Sprintf("Message %02d", i)
}
fmt.Println("*** all messages queued ***")
close(todo)
wg.Wait()
}
TA贡献1820条经验 获得超10个赞
我认为接受的答案对于这个特定的例子非常有效。然而,要回答“缓冲区为空后关闭“工人”程序”的问题 - 一个更优雅的解决方案是可能的。
worker 可以在缓冲区为空时返回,而无需通过关闭通道来发出信号。
如果工作人员需要处理的任务数量未知,这将特别有用。
在这里查看:https : //play.golang.org/p/LZ1y0eIRMeS
package main
import (
"fmt"
"time"
"math/rand"
)
func main() {
rand.Seed(time.Now().UnixNano())
ch := make(chan interface{}, 10)
go worker(ch)
for i := 1; i <= rand.Intn(9) + 1; i++ {
ch <- i
}
blocker := make(chan interface{})
<-blocker
}
func worker(ch chan interface{}){
for {
select {
case msg := <- ch:
fmt.Println("msg: ", msg)
default:
fmt.Println("exiting worker")
return
}
}
}
- 3 回答
- 0 关注
- 166 浏览
添加回答
举报