为了账号安全,请及时绑定邮箱和手机立即绑定

程序因频道而挂起

程序因频道而挂起

Go
慕桂英4014372 2023-07-31 17:32:54
我想使用 goroutine 来批量处理来自不同客户的不同日期的请求。我的意思是 50 个消费者 goroutine 来消费数据库中的所有客户,以及 2 个日期消费者 goroutine 来消费日期切片。主要代码如下,但它挂起并且没有按预期退出。为什么它没有按预期退出?func Run(){    var syncWg sync.WaitGroup    syncWg.Add(1)    go SyncCustomerMetricsHistory(&syncWg)    syncWg.Wait()}func SyncCustomerMetricsHistory(wg *sync.WaitGroup){    defer wg.Done()    odb := orm.NewOrm()    start := time.Now()    logs.Info("start sync  customer metrics, time:[%v]", start)    qs := odb.QueryTable("gg_customer")    var customers []*db.GgCustomer    if num, err := qs.All(&customers); err != nil || num == 0 {        logs.Error("Get customer error, rows:[%v], err:[%v]", num, err)    }    customersChan := make(chan *db.GgCustomer, 50)    var wgC sync.WaitGroup    wgC.Add(50)    for i := 0; i < 50; i++ {        go syncCustomerMetricsHistory(customersChan, &wgC)    }    go func() {        for _, customer := range customers {            customersChan <- customer        }        close(customersChan)    }()    wgC.Wait()}func  syncCustomerMetricsHistory(customerChan <- chan *db.GgCustomer, wg *sync.WaitGroup){    defer wg.Done()    for customer := range customerChan{            dateChan := make(chan string, 2)            var wgD sync.WaitGroup            wgD.Add(2)            for i := 1; i < 2; i++{                go test(dateChan, customer, &wgD)            }            go func(){                for _, date := range GetAllYearDate(){                    dateChan <- date                }                close(dateChan)            }()            wgD.Wait()        }    }}func test(dateChan <- chan string, customer *db.GgCustomer, wg *sync.WaitGroup){    defer wg.Done()    for date := range dateChan{        fmt.Println(date, customer)    }}func  GetAllYearDate()  []string{  return []string{"2019-10-01", "2019-10-02"}}
查看完整描述

1 回答

?
四季花海

TA贡献1811条经验 获得超5个赞

我没有尝试运行这个(因为它需要额外的代码),但相信你的问题是:


wgD.Add(2)

for i := 1; i < 2; i++{

 go test(dateChan, customer, &wgD)

}

该 for 循环只会迭代一次,但您调用了 wgD.Add(2) (我认为您可能意味着循环迭代两次;尝试i <= 2)。


另一点反馈;您使用等待组的方式会起作用,但很难遵循(可能导致您没有发现问题);怎么样:


func Run(){

    SyncCustomerMetricsHistory()  // No wait group needed as this will not return before done

}


func SyncCustomerMetricsHistory(){

    odb := orm.NewOrm()

    start := time.Now()

    logs.Info("start sync  customer metrics, time:[%v]", start)


    qs := odb.QueryTable("gg_customer")

    var customers []*db.GgCustomer

    if num, err := qs.All(&customers); err != nil || num == 0 {

        logs.Error("Get customer error, rows:[%v], err:[%v]", num, err)

    }


    customersChan := make(chan *db.GgCustomer, 50)


    var wgC sync.WaitGroup

    wgC.Add(50)

    for i := 0; i < 50; i++ {

        go func() {

            syncCustomerMetricsHistory(customersChan)

            wgC.Done()

        }()

    }


    go func() {

        for _, customer := range customers {

            customersChan <- customer

        }

        close(customersChan)

    }()

    wgC.Wait()

}




func  syncCustomerMetricsHistory(customerChan <- chan *db.GgCustomer){

    for customer := range customerChan{

            dateChan := make(chan string, 2)

            var wgD sync.WaitGroup

            wgD.Add(2)

            for i := 1; i < 2; i++{

                go func() {

                    test(dateChan, customer)

                    wgD.Done()

                }()

            }

            go func(){

                for _, date := range GetAllYearDate(){

                    dateChan <- date

                }

                close(dateChan)

            }()

            wgD.Wait()

        }

    }

}

我认为这更容易理解,因为您可以看到 wg.Done() 被调用的位置。在两侧粘贴一些 fmt.Println 命令也非常容易,这使得调试此类问题变得更简单。


查看完整回答
反对 回复 2023-07-31
  • 1 回答
  • 0 关注
  • 138 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信