为了账号安全,请及时绑定邮箱和手机立即绑定

go 例程子集上的等待组

go 例程子集上的等待组

Go
RISEBY 2023-03-29 17:30:41
我遇到的情况是,主要的 go 例程将创建“x”go 例程。但它只对要完成的“y”( y < x ) go routines 感兴趣。我希望使用 Waitgroup。但是 Waitgroup 只允许我等待所有的例程。例如,我不能这样做,1. wg.Add (y) 2 create "x" go routines. These routines will call wg.Done() when finished.  3. wg. Wait()当 y+1 go 例程调用 wg.Done() 时会出现恐慌,因为 wg 计数器变为负值。我当然可以使用渠道来解决这个问题,但我对 Waitgroup 是否能解决这个问题感兴趣。
查看完整描述

3 回答

?
米琪卡哇伊

TA贡献1998条经验 获得超6个赞

sync.WaitGroup这是一个简单的计数器,其Wait方法将阻塞直到计数器值达到零。它旨在允许您在允许主要执行流程继续进行之前阻止(或加入)多个 goroutine。

的界面WaitGroup对于您的用例而言没有足够的表现力,也不是设计为。特别是,您不能通过简单地调用wg.Add(y)(where y < x) 来天真地使用它。wg.Done第 (y+1)goroutine调用将导致 panic,因为等待组具有负内部值是错误的。此外,我们不能通过观察 ; 的内部计数器值来“聪明” WaitGroup。这会破坏抽象,无论如何,它的内部状态不会被导出。


实现你自己的!

您可以根据下面的代码使用一些渠道自己实现相关逻辑。从控制台观察,启动了 10 个 goroutine,但在两个完成后,我们 fallthrough 继续在 main 方法中执行。

package main


import (

    "fmt"

    "time"

)


// Set goroutine counts here

const (

    // The number of goroutines to spawn

    x = 10

    // The number of goroutines to wait for completion

    // (y <= x) must hold.

    y = 2

)


func doSomeWork() {

    // do something meaningful

    time.Sleep(time.Second)

}


func main() {

    // Accumulator channel, used by each goroutine to signal completion.

    // It is buffered to ensure the [y+1, ..., x) goroutines do not block

    // when sending to the channel, which would cause a leak. It will be

    // garbage collected when all goroutines end and the channel falls

    // out of scope. We receive y values, so only need capacity to receive

    // (x-y) remaining values.

    accChan := make(chan struct{}, x-y)


    // Spawn "x" goroutines

    for i := 0; i < x; i += 1 {

        // Wrap our work function with the local signalling logic

        go func(id int, doneChan chan<- struct{}) {

            fmt.Printf("starting goroutine #%d\n", id)

            doSomeWork()

            fmt.Printf("goroutine #%d completed\n", id)


            // Communicate completion of goroutine

            doneChan <- struct{}{}

        }(i, accChan)

    }


    for doneCount := 0; doneCount < y; doneCount += 1 {

        <-accChan

    }


    // Continue working

    fmt.Println("Carrying on without waiting for more goroutines")

}

避免泄漏资源

由于这不会等待 [y+1, ..., x) goroutines 完成,因此您应该特别注意函数doSomeWork以消除或最小化工作可能无限期阻塞的风险,这也会导致泄漏。在可能的情况下,消除无限期阻塞 I/O(包括通道操作)或陷入无限循环的可能性。


context当不再需要它们的结果来中断执行时,您可以使用 a向其他 goroutine 发出信号。



查看完整回答
反对 回复 2023-03-29
?
HUH函数

TA贡献1836条经验 获得超4个赞

WaitGroup实际上并不等待 goroutines,它一直等到其内部计数器达到零。如果你只有Add()你关心的 goroutines 的数量,并且你只调用Done()你关心的那些 goroutines,那么Wait()只会阻塞直到你关心的那些 goroutines 完成。您可以完全控制逻辑和流程,对WaitGroup“允许”的内容没有任何限制。



查看完整回答
反对 回复 2023-03-29
?
红糖糍粑

TA贡献1815条经验 获得超6个赞

这些是您要跟踪的特定 y 例程,还是 x 中的任何 y?标准是什么?


更新:


1. 如果您可以控制任何标准来选择matching ygo-routines:


wp.wg.Add(1)如果无法wp.wg.Done()在 goroutine 外部检查您的条件,则可以通过将其作为指针参数传递给 goroutine 来根据您的条件从 goroutine 内部执行和操作。


类似于下面的示例代码。如果您提供有关您正在尝试做的事情的更多详细信息,将能够更具体。


func sampleGoroutine(z int, b string, wg *sync.WaitGroup){


    defer func(){

        if contition1{

            wg.Done()

        }

    }


    if contition1 {

        wg.Add(1)

        //do stuff

    }

}


func main() {

    wg := sync.WaitGroup{}

    for i := 0; i < x; i++ {

        go sampleGoroutine(1, "one", &wg)

    }

    wg.Wait()

}

2. 如果你无法控制哪些,只想要first y:


根据您的评论,您无法控制/希望选择任何特定的 goroutine,而是选择最先完成的 goroutine。如果您想以通用方式执行此操作,则可以使用适合您的用例的以下自定义 waitGroup 实现。(不过,它不是复制安全的。也没有/需要 wg.Add(int) 方法)


type CountedWait struct {

    wait  chan struct{}

    limit int

}


func NewCountedWait(limit int) *CountedWait {

    return &CountedWait{

        wait:  make(chan struct{}, limit),

        limit: limit,

    }

}


func (cwg *CountedWait) Done() {

    cwg.wait <- struct{}{}

}


func (cwg *CountedWait) Wait() {

    count := 0

    for count < cwg.limit {

        <-cwg.wait

        count += 1

    }

}

可以按如下方式使用:


func sampleGoroutine(z int, b string, wg *CountedWait) {


    success := false


    defer func() {

        if success == true {

            fmt.Printf("goroutine %d finished successfully\n", z)

            wg.Done()

        }

    }()


    fmt.Printf("goroutine %d started\n", z)

    time.Sleep(time.Second)


    if rand.Intn(10)%2 == 0 {

        success = true

    }

}


func main() {

    x := 10

    y := 3

    wg := NewCountedWait(y)


    for i := 0; i < x; i += 1 {

        // Wrap our work function with the local signalling logic

        go sampleGoroutine(i, "something", wg)

    }


    wg.Wait()


    fmt.Printf("%d out of %d goroutines finished successfully.\n", y, x)

}

3. 你也可以加入context2 以确保剩余的 goroutines 不会泄漏 你可能无法在 play.golang 上运行它,因为它有一些长时间的休眠。


下面是一个示例输出:(注意,可能有超过 y=3 个 goroutines 标记完成,但你只等到 3 个完成)



goroutine 9 started

goroutine 0 started

goroutine 1 started

goroutine 2 started

goroutine 3 started

goroutine 4 started

goroutine 5 started

goroutine 5 marking done

goroutine 6 started

goroutine 7 started

goroutine 7 marking done

goroutine 8 started

goroutine 3 marking done

continuing after 3 out of 10 goroutines finished successfully.

goroutine 9 will be killed, bcz cancel

goroutine 8 will be killed, bcz cancel

goroutine 6 will be killed, bcz cancel

goroutine 1 will be killed, bcz cancel

goroutine 0 will be killed, bcz cancel

goroutine 4 will be killed, bcz cancel

goroutine 2 will be killed, bcz cancel

查看完整回答
反对 回复 2023-03-29
  • 3 回答
  • 0 关注
  • 113 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信