2 回答
TA贡献1856条经验 获得超11个赞
阅读描述和片段我无法准确理解你想要实现的目标,但我有一些我每天使用的频道的提示和模式并且认为有帮助。
该
context
包对于以安全的方式管理 goroutines 的状态非常有帮助。在您的示例中,time.After
用于结束主程序,但在非主函数中它可能会泄漏 goroutines:如果您改为使用context.Context
它并将其传递给 gorotuines(它通常传递函数的第一个参数),您将能够控制取消下游呼叫。这简单解释一下。通常的做法是在生成消息并在通道中发送消息的函数中创建通道(并返回它们)。同样的功能应该负责关闭通道,例如,
defer close(channel)
当它完成写入时。这很方便,因为当通道被缓冲时,即使它仍然有数据也可以关闭它:Go 运行时实际上会等到所有消息都被轮询后再关闭。对于无缓冲通道,该函数将无法通过通道发送消息,直到通道的读者准备好轮询它,因此无法退出。 这是一个例子(没有递归)。在此示例中,我们可以close
在缓冲或未缓冲时使用通道,因为发送将阻塞直到for := range
在主 goroutine 的通道上从中读取。 这是相同原理的变体,通道作为参数传递。我们可以与通道
sync.WaitGroup
一起使用,为单个 goroutine 发出完成信号,并让“编排”goroutine 知道通道可以关闭,因为所有消息生产者都已完成向通道发送数据。与第 1 点相同的注意事项适用于close
操作。 这是一个示例,显示了 waitGroup 的使用和通道的外部关闭器。渠道可以有方向!请注意,在示例中,我在将箭头传入/外部函数时添加/删除了通道旁边的箭头(例如
<-chan string
, 或)。chan<- string
这告诉编译器,一个通道在该函数的范围内分别只读或写。这在两个方面有所帮助:编译器将生成更高效的代码,因为有方向的通道将有一个锁而不是 2 个。
函数的签名描述了它是否只会使用通道写入(可能
close()
)或读取:请记住,range
当通道关闭时,从带有 a 的通道读取会自动停止迭代。您可以构建通道的通道:
make(chan chan string)
是构建处理管道的有效(且有用)构造。它的一个常见用法是扇入 goroutine,它收集一系列 channel-producing goroutines 的多个输出。 这是如何使用它们的示例。
本质上,回答您最初的问题:
从作为 goroutine 开始的函数对其自身进行递归调用是一种有效的方法吗?
如果你真的需要递归,最好将它与并发代码分开处理:创建一个专用函数,递归地将数据发送到通道,并在调用者中协调通道的关闭。
在所有派生的 goroutine 完成之前,从 cRes 读取结果的惯用方式是什么?我在某处读到,计算完成后通道应该关闭,但我只是想不通在这种情况下如何集成它。
一个很好的参考是Go Concurrency Patterns: Pipelines and cancellation:这是一个相当古老的帖子(在context
std lib 中存在包之前),我认为Parallel digestion
你正在寻找解决原始问题的方法。
TA贡献1839条经验 获得超15个赞
正如 torek 所提到的,我在 waitgroup 完成等待后剥离了一个关闭通道的匿名函数。还需要一些逻辑,仅在goroutine 生成级别的递归返回后才wg.Done()调用生成的 goroutine 。
一般来说,我认为这是一个有用的成语(如果我错了请纠正我:))
游乐场:https ://go.dev/play/p/bQjHENsZL25
func main() {
cRes := make(chan string, 100)
numLevels := 3
spread := 3
startConcurrencyAtLevel := 2
var wg sync.WaitGroup
nTree("", numLevels, spread, startConcurrencyAtLevel, cRes, &wg)
go func() {
// time.Sleep(1 * time.Second) // edit: code should work without this initial sleep
wg.Wait()
close(cRes)
}()
for r := range cRes {
fmt.Println(r)
}
fmt.Println("Done!")
}
func nTree(path string, maxLevels int, spread int, startConcurrencyAtLevel int, cRes chan string, wg *sync.WaitGroup) {
if len(path) == maxLevels {
// some longer running task here associated with the found path
cRes <- path
return
}
for i := 1; i <= spread; i++ {
nextPath := path + fmt.Sprint(i)
if len(path) == startConcurrencyAtLevel {
go nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes, wg)
} else {
nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes, wg)
}
}
}
- 2 回答
- 0 关注
- 76 浏览
添加回答
举报