为了账号安全,请及时绑定邮箱和手机立即绑定

将来自多个 go routines 的响应获取到一个数组中

将来自多个 go routines 的响应获取到一个数组中

Go
眼眸繁星 2023-06-05 17:18:28
我需要从多个 go 例程中获取响应并将它们放入一个数组中。我知道通道可用于此目的,但我不确定如何确保所有 go 例程都已完成结果处理。因此我正在使用等待组。代码func main() {  log.Info("Collecting ints")  var results []int32  for _, broker := range e.BrokersByBrokerID {      wg.Add(1)      go getInt32(&wg)  }  wg.Wait()  log.info("Collected")}func getInt32(wg *sync.WaitGroup) (int32, error) {  defer wg.Done()  // Just to show that this method may just return an error and no int32  err := broker.Open(config)  if err != nil && err != sarama.ErrAlreadyConnected {    return 0, fmt.Errorf("Cannot connect to broker '%v': %s", broker.ID(), err)  }  defer broker.Close()  return 1003, nil}我的问题如何将所有响应 int32(可能返回错误)放入我的 int32 数组,确保所有 go 例程都已完成处理工作并返回错误或 int?
查看完整描述

4 回答

?
繁花如伊

TA贡献2012条经验 获得超12个赞

如果您不处理作为 goroutine 启动的函数的返回值,它们将被丢弃。

您可以使用切片来收集结果,其中每个 goroutine 都可以接收将结果放入的索引,或者元素的地址。请注意,如果您使用它,则必须预先分配切片,并且只能写入属于 goroutine 的元素,您不能“触摸”其他元素,也不能附加到切片。

或者您可以使用一个通道,goroutines 在该通道上发送包含它们处理的项目的索引或 ID 的值,以便收集 goroutine 可以识别或排序它们。

请注意,这里不需要等待组,因为我们知道我们期望通道上的值与我们启动的 goroutine 一样多。

type result struct {

    task int32

    data int32

    err  error

}


func main() {

    tasks := []int32{1, 2, 3, 4}


    ch := make(chan result)


    for _, task := range tasks {

        go calcTask(task, ch)

    }


    // Collect results:

    results := make([]result, len(tasks))


    for i := range results {

        results[i] = <-ch

    }


    fmt.Printf("Results: %+v\n", results)

}


func calcTask(task int32, ch chan<- result) {

    if task > 2 {

        // Simulate failure

        ch <- result{task: task, err: fmt.Errorf("task %v failed", task)}

        return

    }


    // Simulate success

    ch <- result{task: task, data: task * 2, err: nil}

}

输出(在Go Playground上尝试):


Results: [{task:4 data:0 err:0x40e130} {task:1 data:2 err:<nil>} {task:2 data:4 err:<nil>} {task:3 data:0 err:0x40e138}]



查看完整回答
反对 回复 2023-06-05
?
杨__羊羊

TA贡献1943条经验 获得超7个赞

我也相信你必须使用频道,它必须是这样的:


package main


import (

    "fmt"

    "log"

    "sync"

)


var (

    BrokersByBrokerID = []int32{1, 2, 3}

)


type result struct {

    data string

    err string // you must use error type here

}


func main()  {

    var wg sync.WaitGroup

    var results []result

    ch := make(chan result)


    for _, broker := range BrokersByBrokerID {

        wg.Add(1)

        go getInt32(ch, &wg, broker)

    }


    go func() {

        for v := range ch {

            results = append(results, v)

        }

    }()


    wg.Wait()

    close(ch)


    log.Printf("collected %v", results)

}


func getInt32(ch chan result, wg *sync.WaitGroup, broker int32) {

    defer wg.Done()


    if broker == 1 {

        ch <- result{err: fmt.Sprintf("error: gor broker 1")}

        return

    }


    ch <- result{data: fmt.Sprintf("broker %d - ok", broker)}

}

结果将如下所示:


2019/02/05 15:26:28 collected [{broker 3 - ok } {broker 2 - ok } { error: gor broker 1}]



查看完整回答
反对 回复 2023-06-05
?
拉莫斯之舞

TA贡献1820条经验 获得超10个赞

package main


import (

    "fmt"

    "log"

    "sync"

)


var (

    BrokersByBrokerID = []int{1, 2, 3, 4}

)


type result struct {

    data string

    err  string // you must use error type here

}


func main() {

    var wg sync.WaitGroup

    var results []int

    ch := make(chan int)

    done := make(chan bool)

    for _, broker := range BrokersByBrokerID {

        wg.Add(1)


        go func(i int) {

            defer wg.Done()

            ch <- i

            if i == 4 {

                done <- true

            }


        }(broker)

    }

L:

    for {

        select {

        case v := <-ch:


            results = append(results, v)

            if len(results) == 4 {

                //<-done

                close(ch)

                break L

                

            }


        case _ = <-done:

            break

        }

    }


    fmt.Println("STOPPED")

    //<-done

    wg.Wait()


    log.Printf("collected %v", results)


}


查看完整回答
反对 回复 2023-06-05
?
月关宝盒

TA贡献1772条经验 获得超5个赞

package main


import (

    "fmt"

    "log"

    "sync"

    "time"

)


var (

    BrokersByBrokerID = []int{1, 2, 3, 4}

)


type result struct {

    data string

    err  string // you must use error type here

}


func main() {    

    var wg sync.WaitGroup.   

    var results []int  

    ch := make(chan int)  

    done := make(chan bool) 

    for _, broker := range BrokersByBrokerID {                      

       wg.Add(1)


        go func(i int) {

            defer wg.Done()

            ch <- i

            if i == 4 {

                done <- true

            } 


        }(broker)

    }

    

    for v := range ch {


        results = append(results, v)

        if len(results) == 4 {

            close(ch)

        }


    }


    fmt.Println("STOPPED")

    <-done

    wg.Wait()

    

    log.Printf("collected %v", results)



}



</pre>


查看完整回答
反对 回复 2023-06-05
  • 4 回答
  • 0 关注
  • 185 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信