4 回答
TA贡献1796条经验 获得超10个赞
是的,WaitGroup
是正确的答案。根据doc ,您可以随时使用WaitGroup.Add
计数器大于零。
请注意,当计数器为零时发生的具有正增量的调用必须发生在等待之前。具有负增量的调用或在计数器大于零时开始的具有正增量的调用可能随时发生。通常这意味着对 Add 的调用应该在创建 goroutine 或其他要等待的事件的语句之前执行。如果重复使用 WaitGroup 来等待多个独立的事件集,则必须在所有先前的 Wait 调用返回后发生新的 Add 调用。请参阅 WaitGroup 示例。
Close
但是一个技巧是,在调用之前,您应该始终保持计数器大于零。这通常意味着您应该调用wg.Add
in NewFoo
(或类似的东西)并wg.Done
in Close
. 并且为了防止多次调用Done
破坏等待组,你应该包装Close
成sync.Once
. 您可能还想防止Bar()
调用 new。
TA贡献1811条经验 获得超6个赞
我认为无限期地等待所有 go 例程完成不是正确的方法。如果其中一个 go routines 被阻塞或说它由于某种原因挂起并且从未成功终止,应该发生什么情况 kill 进程或等待 go routines 完成?
相反,无论所有例程是否已完成,您都应该等待一段时间并终止应用程序。
上下文包可用于向所有 go 例程发送信号以处理 kill 信号。
appCtx, cancel := context.WithCancel(context.Background())
这里 appCtx 必须传递给所有的 go 例程。
在退出信号调用cancel()
。
作为 go 例程运行的函数可以处理如何处理取消上下文。
TA贡献1853条经验 获得超18个赞
WaitGroup是一种方式,但是,Go 团队errgroup完全针对您的用例引入了。leaf bebop 的回答中最不方便的部分是忽视错误处理。错误处理是存在的原因errgroup。惯用的 go 代码不应该吞下错误。
但是,保留结构的签名Foo(装饰性的除外workerNumber)——并且没有错误处理——我的建议如下所示:
package main
import (
"fmt"
"math/rand"
"time"
"golang.org/x/sync/errgroup"
)
type Foo struct {
errg errgroup.Group
}
func NewFoo() *Foo {
foo := &Foo{
errg: errgroup.Group{},
}
return foo
}
func (a *Foo) Bar(workerNumber int) {
a.errg.Go(func() error {
select {
// simulates the long running clals
case <-time.After(time.Second * time.Duration(rand.Intn(10))):
fmt.Println(fmt.Sprintf("worker %d completed its work", workerNumber))
return nil
}
})
}
func (a *Foo) Close() {
a.errg.Wait()
}
func main() {
foo := NewFoo()
for i := 0; i < 10; i++ {
foo.Bar(i)
}
<-time.After(time.Second * 5)
fmt.Println("Waiting for workers to complete...")
foo.Close()
fmt.Println("Done.")
}
这里的好处是,如果你在你的代码中引入错误处理(你应该),你只需要稍微修改这段代码:简而言之,将返回errg.Wait()第一个 redis 错误,并且Close()可以通过堆栈向上传播它(到 main,在这种情况下)。
也可以使用该context.Context包,如果调用失败,您还可以立即取消任何正在运行的 redis 调用。文档中有这方面的示例errgroup。
TA贡献2039条经验 获得超7个赞
我经常使用的模式是:https ://play.golang.org/p/ibMz36TS62z
package main
import (
"fmt"
"sync"
"time"
)
type response struct {
message string
}
func task(i int, done chan response) {
time.Sleep(1 * time.Second)
done <- response{fmt.Sprintf("%d done", i)}
}
func main() {
responses := GetResponses(10)
fmt.Println("all done", len(responses))
}
func GetResponses(n int) []response {
donequeue := make(chan response)
wg := sync.WaitGroup{}
for i := 0; i < n; i++ {
wg.Add(1)
go func(value int) {
defer wg.Done()
task(value, donequeue)
}(i)
}
go func() {
wg.Wait()
close(donequeue)
}()
responses := []response{}
for result := range donequeue {
responses = append(responses, result)
}
return responses
}
这也使得节流变得容易:https ://play.golang.org/p/a4MKwJKj634
package main
import (
"fmt"
"sync"
"time"
)
type response struct {
message string
}
func task(i int, done chan response) {
time.Sleep(1 * time.Second)
done <- response{fmt.Sprintf("%d done", i)}
}
func main() {
responses := GetResponses(10, 2)
fmt.Println("all done", len(responses))
}
func GetResponses(n, concurrent int) []response {
throttle := make(chan int, concurrent)
for i := 0; i < concurrent; i++ {
throttle <- i
}
donequeue := make(chan response)
wg := sync.WaitGroup{}
for i := 0; i < n; i++ {
wg.Add(1)
<-throttle
go func(value int) {
defer wg.Done()
throttle <- 1
task(value, donequeue)
}(i)
}
go func() {
wg.Wait()
close(donequeue)
}()
responses := []response{}
for result := range donequeue {
responses = append(responses, result)
}
return responses
}
- 4 回答
- 0 关注
- 152 浏览
添加回答
举报