3 回答
TA贡献1848条经验 获得超10个赞
在其他答案之上。
请(非常)小心,关闭通道应该发生在写入调用站点上,而不是读取调用站点上。在正在写入的GoCountColumns通道中r,关闭通道的责任落在GoCountColumns函数上。技术原因是,它是唯一确定该通道将不再被写入的参与者,因此可以安全关闭。
func GoCountColumns(in chan []string, r chan Result, quit chan int) {
defer close(r) // this line.
for {
select {
case data := <-in:
r <- countColumns(data) // some calculation function
case <-quit:
return // stop goroutine
}
}
}
如果我可以说,函数参数命名约定是将目标作为第一个参数,将源作为第二个参数,然后使用其他参数。GoCountColumns优选地写成:
func GoCountColumns(dst chan Result, src chan []string, quit chan int) {
defer close(dst)
for {
select {
case data := <-src:
dst <- countColumns(data) // some calculation function
case <-quit:
return // stop goroutine
}
}
}
quit您在流程开始后立即致电。这是不合逻辑的。该quit命令是强制退出序列,一旦检测到退出信号就应该调用它,以尽可能以最佳状态(可能全部损坏)强制退出当前处理。换句话说,您应该依赖该signal.Notify包来捕获退出事件,并通知您的工作人员退出。请参阅https://golang.org/pkg/os/signal/#example_Notify
为了编写更好的并行代码,首先列出管理程序生命周期所需的例程,确定需要阻塞的例程以确保程序在退出之前完成。
在您的代码中,存在read, map。为了确保处理完整,程序主函数必须确保在退出时捕获信号,map然后再退出。请注意,该read功能并不重要。
然后,您还需要从用户输入捕获退出事件所需的代码。
总的来说,我们似乎需要阻止两个事件来管理生命周期。示意性地说,
func main(){
go read()
go map(mapDone)
go signal()
select {
case <-mapDone:
case <-sig:
}
}
这个简单的代码很好process or die。事实上,当捕获到用户事件时,程序立即退出,而不给其他例程机会执行停止时所需的操作。
为了改善这些行为,您首先需要一种方法来表明程序想要离开其他例程,其次需要一种方法来等待这些例程在离开之前完成其停止序列。
要发出退出事件或取消信号,您可以使用 a context.Context,将其传递给工作人员,让他们听。
再次,示意性地,
func main(){
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
go map(ctx,mapDone)
go signal()
select {
case <-mapDone:
case <-sig:
cancel()
}
}
(稍后将详细阅读和绘制地图)
要等待完成,很多事情都是可能的,只要它们是线程安全的。通常,sync.WaitGroup使用 a。或者,在像您这样的情况下,只有一个例程需要等待,我们可以重新使用当前mapDone通道。
func main(){
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
go map(ctx,mapDone)
go signal()
select {
case <-mapDone:
case <-sig:
cancel()
<-mapDone
}
}
这很简单也很直接。但这并不完全正确。最后一个mapDone chan可能会永远阻塞并使程序无法停止。因此,您可以实现第二个信号处理程序或超时。
示意性地,超时解决方案是
func main(){
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
go map(ctx,mapDone)
go signal()
select {
case <-mapDone:
case <-sig:
cancel()
select {
case <-mapDone:
case <-time.After(time.Second):
}
}
}
您还可以在最后一次选择中累积信号处理和超时。
最后,有几件事要讲read和map上下文聆听。
首先map,实现需要定期读取context.Done通道来检测cancellation。
这是简单的部分,只需要更新 select 语句。
func GoCountColumns(ctx context.Context, dst chan Result, src chan []string) {
defer close(dst)
for {
select {
case <-ctx.Done():
<-time.After(time.Minute) // do something more useful.
return // quit. Notice the defer will be called.
case data := <-src:
dst <- countColumns(data) // some calculation function
}
}
}
现在这read部分有点棘手,因为它是一个 IO,它不提供select强大的编程接口,并且监听上下文通道取消可能看起来很矛盾。这是。由于 IO 是阻塞的,因此无法侦听上下文。并且在从上下文通道读取时,无法读取 IO。在您的情况下,解决方案需要了解您的读取循环与您的程序生命周期无关(还记得我们只监听mapDone吗?),并且我们可以忽略上下文。
在其他情况下,例如,如果您想在读取最后一个字节时重新启动(因此在每次读取时,我们都会增加 n,计算字节数,并且我们希望在停止时保存该值)。然后,需要启动一个新的例程,因此,多个例程需要等待完成。在这种情况下,async.WaitGroup会更合适。
示意性地说,
func main(){
var wg sync.WaitGroup
processDone:=make(chan struct{})
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
wg.Add(1)
go saveN(ctx,&wg)
wg.Add(1)
go map(ctx,&wg)
go signal()
go func(){
wg.Wait()
close(processDone)
}()
select {
case <-processDone:
case <-sig:
cancel()
select {
case <-processDone:
case <-time.After(time.Second):
}
}
}
在最后的代码中,正在传递等待组。例程负责调用wg.Done(),当所有例程完成后,processDone通道关闭,以发出选择信号。
func GoCountColumns(ctx context.Context, dst chan Result, src chan []string, wg *sync.WaitGroup) {
defer wg.Done()
defer close(dst)
for {
select {
case <-ctx.Done():
<-time.After(time.Minute) // do something more useful.
return // quit. Notice the defer will be called.
case data := <-src:
dst <- countColumns(data) // some calculation function
}
}
}
尚未确定哪种模式是首选,但您也可能会看到waitgroup仅在调用站点进行管理。
func main(){
var wg sync.WaitGroup
processDone:=make(chan struct{})
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
wg.Add(1)
go func(){
defer wg.Done()
saveN(ctx)
}()
wg.Add(1)
go func(){
defer wg.Done()
map(ctx)
}()
go signal()
go func(){
wg.Wait()
close(processDone)
}()
select {
case <-processDone:
case <-sig:
cancel()
select {
case <-processDone:
case <-time.After(time.Second):
}
}
}
除了所有这些问题和 OP 问题之外,您必须始终预先评估并行处理对于给定任务的相关性。没有独特的秘诀,练习和衡量你的代码性能。参见 pprof.
TA贡献1850条经验 获得超11个赞
这段代码中发生的事情太多了。您应该将代码重组为服务于特定目的的短函数,以便其他人可以轻松地帮助您(也可以帮助您自己)。
有多种方法可以让一个 go-routine 等待其他工作完成。最常见的方法是使用等待组(我提供的示例)或通道。
func processSomething(...) {
...
}
func main() {
workers := &sync.WaitGroup{}
for i := 0; i < numWorkers; i++ {
workers.Add(1) // you want to call this from the calling go-routine and before spawning the worker go-routine
go func() {
defer workers.Done() // you want to call this from the worker go-routine when the work is done (NOTE the defer, which ensures it is called no matter what)
processSomething(....) // your async processing
}()
}
// this will block until all workers have finished their work
workers.Wait()
}
TA贡献1775条经验 获得超8个赞
您可以使用通道来阻塞,main直到 Goroutine 完成。
package main
import (
"log"
"time"
)
func main() {
c := make(chan struct{})
go func() {
time.Sleep(3 * time.Second)
log.Println("bye")
close(c)
}()
// This blocks until the channel is closed by the routine
<-c
}
无需向通道写入任何内容。读取会被阻塞,直到读取数据或者我们在这里使用的通道关闭为止。
- 3 回答
- 0 关注
- 138 浏览
添加回答
举报