为了账号安全,请及时绑定邮箱和手机立即绑定

Go - 如何知道输出通道何时完成

Go - 如何知道输出通道何时完成

Go
慕的地6264312 2021-09-10 14:53:05
我尝试遵循 Rob Pike 在“并发不是并行”的演讲中的例子,并做了这样的事情:我正在启动许多 go 例程作为从输入通道读取的工作人员,执行一些处理,然后通过输出通道发送结果.然后我开始另一个 go 例程,它从某个源读取数据并通过他们的输入通道将其发送给工作人员。最后,我想遍历输出通道中的所有结果并对其进行处理。问题是,由于工作在工作人员之间分配,我不知道所有工作人员何时完成,因此我可以停止向输出通道询问更多结果,并且我的程序可以正常结束。了解工作人员何时完成将结果发送到输出通道的最佳做法是什么?
查看完整描述

3 回答

?
阿晨1998

TA贡献2037条经验 获得超6个赞

var Z = "Z"


func Loop() {

    sc := make(chan *string)

    ss := make([]string, 0)

    done := make(chan struct{}, 1)

    go func() {

        //1 QUERY

        slice1 := []string{"a", "b", "c"}

        //2 WG INIT

        var wg1 sync.WaitGroup

        wg1.Add(len(slice1))

        //3 LOOP->

        loopSlice1(slice1, sc, &wg1)

        //7 WG WAIT<-

        wg1.Wait()

        sc <- &Z

        done <- struct{}{}

    }()


    go func() {

        var cc *string

        for {

            cc = <-sc

            log.Infof("<-sc %s", *cc)

            if *cc == Z {

                break

            }

            ss = append(ss, *cc)

        }

    }()

    <-done

    log.Infof("FUN: %#v", ss)

}


func loopSlice1(slice1 []string, sc chan *string, wg1 *sync.WaitGroup) {

    for i, x := range slice1 {

        //4 GO

        go func(n int, v string) {

            //5 WG DONE

            defer wg1.Done()

            //6 DOING

            //[1 QUERY

            slice2 := []string{"X", "Y", "Z"}

            //[2 WG INIT

            var wg2 sync.WaitGroup

            wg2.Add(len(slice2))

            //[3 LOOP ->

            loopSlice2(n, v, slice2, sc, &wg2)

            //[7 WG WAIT <-

            wg2.Wait()

        }(i, x)

    }

}


func loopSlice2(n1 int, v1 string, slice2 []string, sc chan *string, wg2 *sync.WaitGroup) {

    for j, y := range slice2 {

        //[4 GO

        go func(n2 int, v2 string) {

            //[5 WG DONE

            defer wg2.Done()

            //[6 DOING

            r := fmt.Sprintf("%v%v %v,%v", n1, n2, v1, v2)

            sc <- &r

        }(j, y)

    }

}


查看完整回答
反对 回复 2021-09-10
?
不负相思意

TA贡献1777条经验 获得超10个赞

请我首先澄清您的术语:对渠道末端的误解可能会导致以后出现问题。您询问“输出通道”和“输入通道”。哪有这回事; 只有渠道。


每个通道都有两端:输出(写入)端和输入(读取)端。我会假设这就是你的意思。


现在回答你的问题。


以最简单的情况为例:您只有一个发送方 goroutine 写入通道,并且只有一个工作 goroutine 从另一端读取,并且通道的缓冲为零。发送方 goroutine 将在写入每个项目时阻塞,直到该项目被消耗。通常,这在第一次发生时很快。一旦第一个项目传递给工作人员,工作人员就会很忙,发件人必须等待才能传递第二项内容。因此,乒乓效应如下:作者或读者会忙,但不会两者都忙。在 Rob Pike 所描述的意义上,goroutine 将是并发的,但实际上并不总是并行执行。


如果您有许多从通道读取的工作程序 goroutine(并且其输入端由所有人共享),则发送方最初可以将一个项目分发给每个工作程序,但随后必须等待它们工作(类似于上面描述的乒乓球案例)。最后,当所有项目都由发送方发送后,它的工作就完成了。然而,读者可能还没有完成他们的工作。有时我们关心发件人是否提前完成,有时我们不关心。知道何时发生这种情况最容易通过 WaitGroup 完成(参见Not_a_Golfer的回答和我对相关问题的回答)。


还有一个稍微复杂一点的替代方法:您可以使用返回通道来完成信号传输,而不是使用WaitGroup. 这并不难做到,但WaitGroup在这种情况下是首选,更简单。


相反,如果通道包含缓冲区,则发送方发送最后一项的时间点会更快发生。在通道每个 worker 有一个缓冲区空间的极限情况下;这将允许发件人非常快速地完成,然后,可能继续处理其他事情。(任何比这更多的缓冲都是浪费)。


发送者的这种解耦允许完全异步的行为模式,深受使用其他技术堆栈(Node-JS 和 JVM 的人)的人的喜爱。与它们不同的是,Go不需要您这样做,但您可以选择。


早在 90 年代初期,作为批量同步并行 (BSP) 策略工作的副作用,Leslie Valiant 证明有时非常简单的同步策略可能很便宜。关键因素是需要足够的并行松弛(也称为过度并行)来保持处理器内核忙碌。这意味着必须有足够多的其他工作要做,以便任何特定的 goroutine 被阻塞一段时间都无关紧要。


奇怪的是,这可能意味着使用较少数量的 goroutine 可能比使用较大数量的 goroutine 需要更多的小心。


了解过度并行的影响是有用的:如果整个网络具有过度并行,通常没有必要付出额外的努力来使所有内容异步,因为无论哪种方式,CPU 内核都会很忙。


因此,虽然知道如何等待发件人完成很有用,但更大的应用程序可能不需要您以同样的方式关注。


作为最后一个脚注,WaitGroup是BSP 中使用的意义上的障碍。通过结合障碍和渠道,您可以同时使用 BSP 和 CSP。


查看完整回答
反对 回复 2021-09-10
?
aluckdog

TA贡献1847条经验 获得超7个赞

我个人喜欢为此使用 a sync.WaitGroup。等待组是一个同步计数器,它具有三种方法 - Wait()、Done()和Add()。您要做的是增加等待组的计数器,将其传递给工作人员,并让Done()他们在完成后调用。然后,您只需阻塞另一端的等待组并在它们全部完成后关闭输出通道,从而导致输出处理器退出。


基本上:


// create the wait group

wg := sync.WaitGroup{}


// this is the output channel

outchan := make(chan whatever)


// start the workers

for i := 0; i < N; i++ {

   wg.Add(1) //we increment by one the waitgroup's count


   //the worker pushes data onto the output channel and calls wg.Done() when done

   go work(&wg, outchan)

}


// this is our "waiter" - it blocks until all workers are done and closes the channel

go func() {

  wg.Wait()

  close(outchan)

}()


//this loop will exit automatically when outchan is closed

for item := range outchan {

   workWithIt(item)

}


// TADA!


查看完整回答
反对 回复 2021-09-10
  • 3 回答
  • 0 关注
  • 156 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信