为了账号安全,请及时绑定邮箱和手机立即绑定

并发处理程序正在阻塞

并发处理程序正在阻塞

Go
阿波罗的战车 2022-10-04 16:20:00
我们发现一个工作不正常。在处理程序中,我们将过滤即将到来的消息,然后将有效事件传递给一个 func 进行处理。该功能实现如下:mqtt.MessageHandlerfunc processEvent(i models.Foo) (string, error) {    var wg sync.WaitGroup    quit := make(chan bool)    errc := make(chan error)    done := make(chan error)    err := func1()    if err != nil {        return err    }    switch strings.ToUpper(i.Status) {    case "OK":        wg.Add(1)        go func() {            defer wg.Done()            err = longTimeTask1()            ch := done            if err != nil {                log.Error("%s", err.Error())                ch = errc            }            select {            case ch <- err:                return            case <-quit:                return            }        }()        wg.Add(1)        go func() {            defer wg.Done()            err = longTimeTask2()            ch := done            if err != nil {                ch = errc            }            select {            case ch <- err:                return            case <-quit:                return            }        }()        result := "processed"        count := 0        for {            select {            case err := <-errc:                close(quit)                log.Info("event: %s, %s", "", err.Error())                return "", err            case <-done:                count++                if count == 4 { // why 4???                    return result, nil                }            }        }        wg.Wait()        if err != nil {            log.Info("event: %s, %s", result, err.Error())            return result, err        }        close(quit)        close(errc)        close(done)        return result, nil    default:        return "", nil    }    return "", nil}我明白了,它正试图同步和长时间任务2()。但对我来说,理解起来相当复杂。计数和计数 == 4 的目的是什么?为什么在最后收盘?代码提示无法访问 。在此之前,这个功能运行良好。但最近或可能返回一些错误,这会破坏代码,这个fuc似乎被完全阻止了。你能帮我理解代码,找到潜在的问题并重构这部分吗?longTimeTask1()wg.Wait()longTimeTask1()longTimeTask2()
查看完整描述

2 回答

?
茅侃侃

TA贡献1842条经验 获得超21个赞

查看 ,代码似乎期望从通道接收四条消息。但是,此代码最多可以从两个 goroutine 生成两个这样的消息,因此这是一个错误。countdone


此外,如果任何一个 goroutine 返回错误,它就不会写入通道,所以这是另一个错误。done


另一种写法可能是:


...

result := "processed"

for {

    select {

       case err := <-errc:

          close(quit) // Tell the goroutines to terminate

          log.Info("event: %s, %s", "", err.Error())

          wg.Wait() // Wait for them to finish

          return "", err

  

       case <-done:

          count++

          if count == 2 {

              wg.Wait()

              return result, nil

          }    

}


查看完整回答
反对 回复 2022-10-04
?
肥皂起泡泡

TA贡献1829条经验 获得超6个赞

这正是 errgroup 包设计用于的分叉和联接并发类型:


func processEvent(ctx context.Context, i models.Foo) (string, error) {

    err := func1()

    if err != nil {

        return "", err

    }


    g, ctx := errgroup.WithContext(ctx)


    if strings.ToUpper(i.Status) != "OK" {

        return "", nil

    }


    g.Go(func() error { return longTimeTask1(ctx) })

    g.Go(func() error { return longTimeTask2(ctx) })


    if err := g.Wait(); err != nil {

        log.Printf("event: %v", err)

        return "", err

    }

    return "processed", nil

}

(https://play.golang.org/p/JNMKftQTLGs)


查看完整回答
反对 回复 2022-10-04
  • 2 回答
  • 0 关注
  • 52 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信