1 回答
TA贡献2065条经验 获得超13个赞
使用 async.WaitGroup来跟踪正在运行的 goroutines 的数量。每个 goroutine 在不再从通道获取数据后退出。一旦WaitGroup完成,清理就可以完成了。
是这样的:
import (
"sync"
"time"
)
type Data interface{} // just an example
type Consumer interface {
Consume(Data) Data
CleanUp()
Count() int
Timeout() time.Duration
}
func StartConsumers(consumer Consumer, inCh <-chan Data, outCh chan<- Data) {
wg := sync.WaitGroup{}
for i := 0; i < consumer.Count(); i++ {
wg.Add(1)
go func() {
consumeLoop:
for {
select {
case v, ok := <-inCh: // 'ok' says if the channel is still open
if !ok {
break consumeLoop
}
outCh <- consumer.Consume(v)
case <-time.After(consumer.Timeout()):
break consumeLoop
}
}
wg.Done()
}()
}
wg.Wait()
consumer.CleanUp()
close(outCh)
}
在管道的每个阶段,您都可以使用与上述类似的过程来启动消费者。
- 1 回答
- 0 关注
- 84 浏览
添加回答
举报