为了账号安全,请及时绑定邮箱和手机立即绑定

WaitGroup.Wait() 超时

WaitGroup.Wait() 超时

Go
jeck猫 2021-11-08 19:02:25
为WaitGroup.Wait()分配超时的惯用方法是什么?我想这样做的原因是为了保护我的“调度程序”免于永远等待错误的“工人”。这会导致一些哲学问题(即,一旦出现错误的工人,系统如何可靠地继续运行?),但我认为这超出了这个问题的范围。我有一个我会提供的答案。现在我已经把它写下来了,它看起来并没有那么糟糕,但它仍然比它应该的更令人费解。我想知道是否有一些更简单、更惯用的方法,甚至是不使用 WaitGroups 的替代方法。塔。
查看完整描述

3 回答

?
温温酱

TA贡献1752条经验 获得超4个赞

大多数情况下,您在下面发布的解决方案都尽可能好。改进它的几个技巧:


或者,您可以关闭通道以发出完成信号,而不是在其上发送值,关闭通道上的接收操作始终可以立即进行。

最好使用defer语句来表示完成,即使函数突然终止,它也会执行。

此外,如果只有一个“作业”要等待,您可以完全省略WaitGroup并在作业完成时发送一个值或关闭通道(与您在select语句中使用的通道相同)。

指定持续1秒很简单,只要:timeout := time.Second。例如,指定 2 秒是:timeout := 2 * time.Second。您不需要转换,time.Second它已经是 type time.Duration,将它与一个无类型常量相乘2也会产生一个 type 值time.Duration。

我还将创建一个包装此功能的助手/实用程序函数。请注意,WaitGroup必须作为指针传递,否则副本将不会收到WaitGroup.Done()调用的“通知” 。就像是:


// waitTimeout waits for the waitgroup for the specified max timeout.

// Returns true if waiting timed out.

func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {

    c := make(chan struct{})

    go func() {

        defer close(c)

        wg.Wait()

    }()

    select {

    case <-c:

        return false // completed normally

    case <-time.After(timeout):

        return true // timed out

    }

}

使用它:


if waitTimeout(&wg, time.Second) {

    fmt.Println("Timed out waiting for wait group")

} else {

    fmt.Println("Wait group finished")

}

在Go Playground上试一试。


查看完整回答
反对 回复 2021-11-08
?
慕桂英4014372

TA贡献1871条经验 获得超13个赞

我是这样做的:http : //play.golang.org/p/eWv0fRlLEC


go func() {

    wg.Wait()

    c <- struct{}{}

}()

timeout := time.Duration(1) * time.Second

fmt.Printf("Wait for waitgroup (up to %s)\n", timeout)

select {

case <-c:

    fmt.Printf("Wait group finished\n")

case <-time.After(timeout):

    fmt.Printf("Timed out waiting for wait group\n")

}

fmt.Printf("Free at last\n")

它工作正常,但这是最好的方法吗?


查看完整回答
反对 回复 2021-11-08
?
慕尼黑5688855

TA贡献1848条经验 获得超2个赞

大多数现有答案都表明存在泄漏 goroutines。为WaitGroup.Wait分配超时的惯用方法是使用底层同步/原子包原语。我从@icza 答案中获取了代码并使用atomic包重写了它,并添加了上下文取消,因为这是通知超时的惯用方式。


package main


import (

    "context"

    "fmt"

    "sync/atomic"

    "time"

)


func main() {

    var submitCount int32

    // run this instead of wg.Add(1)

    atomic.AddInt32(&submitCount, 1)


    // run this instead of wg.Done()

    // atomic.AddInt32(&submitCount, -1)


    timeout := time.Second

    ctx, cancel := context.WithTimeout(context.Background(), timeout)

    defer cancel()

    fmt.Printf("Wait for waitgroup (up to %s)\n", timeout)


    waitWithCtx(ctx, &submitCount)


    fmt.Println("Free at last")

}


// waitWithCtx returns when passed counter drops to zero

// or when context is cancelled

func waitWithCtx(ctx context.Context, counter *int32) {

    ticker := time.NewTicker(10 * time.Millisecond)

    for {

        select {

        case <-ctx.Done():

            return

        case <-ticker.C:

            if atomic.LoadInt32(counter) == 0 {

                return

            }

        }

    }

}


查看完整回答
反对 回复 2021-11-08
  • 3 回答
  • 0 关注
  • 304 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信