为了账号安全,请及时绑定邮箱和手机立即绑定

goroutine 等待频道的响应并继续

goroutine 等待频道的响应并继续

Go
30秒到达战场 2022-06-01 16:49:26
我正在学习并发,我想实现一个简单的示例,该示例从矩阵中获取行并将值的数组(切片)添加到每一行。由于我使用的是通道,因此我尝试等待每一行从 goroutine 获得相应的结果。但是,这并不比仅同步执行此操作更好。如何让每一行等待它们各自的结果并允许其他行同时计算它们的结果?https://play.golang.org/p/uCOGwOBeIQLpackage mainimport "fmt"/*Array:0 1 2 3 4 5 6 7 8 9+Matrix:1 0 0 0 0 0 0 0 0 00 1 0 0 0 0 0 0 0 00 0 1 0 0 0 0 0 0 00 0 0 1 0 0 0 0 0 00 0 0 0 1 0 0 0 0 00 0 0 0 0 1 0 0 0 00 0 0 0 0 0 1 0 0 00 0 0 0 0 0 0 1 0 00 0 0 0 0 0 0 0 1 00 0 0 0 0 0 0 0 0 1-> Expected result:1 1 2 3 4 5 6 7 8 90 2 2 3 4 5 6 7 8 90 1 3 3 4 5 6 7 8 90 1 2 4 4 5 6 7 8 90 1 2 3 5 5 6 7 8 90 1 2 3 4 6 6 7 8 90 1 2 3 4 5 7 7 8 90 1 2 3 4 5 6 8 8 90 1 2 3 4 5 6 7 9 90 1 2 3 4 5 6 7 8 10*/func main() {    numbers := []int {0,1,2,3,4,5,6,7,8,9}    matrix := [][]int{        {1,0,0,0,0,0,0,0,0,0},        {0,1,0,0,0,0,0,0,0,0},        {0,0,1,0,0,0,0,0,0,0},        {0,0,0,1,0,0,0,0,0,0},        {0,0,0,0,1,0,0,0,0,0},        {0,0,0,0,0,1,0,0,0,0},        {0,0,0,0,0,0,1,0,0,0},        {0,0,0,0,0,0,0,1,0,0},        {0,0,0,0,0,0,0,0,1,0},        {0,0,0,0,0,0,0,0,0,1},    }    rmatrix := make([][]int, 10)    for i, row := range matrix {        cResult := make(chan []int)        go func(row []int, numbers []int, c chan <- []int) {            c <- addRow(row,numbers)        }(row,numbers,cResult)        //this read from the channel will block until the goroutine sends its result over the channel        rmatrix[i] = <- cResult    }    fmt.Println(rmatrix)}func addRow(row []int, numbers []int) []int{    result := make([]int, len(row))    for i,e := range row {        result[i] = e + numbers[i];    }    return result}
查看完整描述

3 回答

?
Helenr

TA贡献1780条经验 获得超4个赞

我需要使用 async.WaitGroup并直接分配调用的结果(以保证它们返回到索引行)。谢谢@彼得


package main


import (

    "fmt"

    "sync"

)


func main() {

    numbers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}


    matrix := [][]int{

        {1, 0, 0, 0, 0, 0, 0, 0, 0, 0},

        {0, 1, 0, 0, 0, 0, 0, 0, 0, 0},

        {0, 0, 1, 0, 0, 0, 0, 0, 0, 0},

        {0, 0, 0, 1, 0, 0, 0, 0, 0, 0},

        {0, 0, 0, 0, 1, 0, 0, 0, 0, 0},

        {0, 0, 0, 0, 0, 1, 0, 0, 0, 0},

        {0, 0, 0, 0, 0, 0, 1, 0, 0, 0},

        {0, 0, 0, 0, 0, 0, 0, 1, 0, 0},

        {0, 0, 0, 0, 0, 0, 0, 0, 1, 0},

        {0, 0, 0, 0, 0, 0, 0, 0, 0, 1},

    }


    rmatrix := make([][]int, 10)

    var waitGroup sync.WaitGroup


    for i, row := range matrix {

        waitGroup.Add(1)

        go func(i int, row []int) {

            rmatrix[i] = addRow(row, numbers)

            waitGroup.Done()

        }(i, row)

    }

    waitGroup.Wait()

    fmt.Println(rmatrix)

}


func addRow(row []int, numbers []int) []int {

    result := make([]int, len(row))

    for i, e := range row {

        result[i] = e + numbers[i]

    }

    return result

}


查看完整回答
反对 回复 2022-06-01
?
慕容3067478

TA贡献1773条经验 获得超3个赞

这个例子产生了较少数量的 goroutine,并且也保证了正确的顺序,不管哪个 goroutine 先完成了它的处理。


package main


import (

    "fmt"

    "sync"

)


type rowRes struct {

    index  int

    result *[]int

}


func addRow(index int, row []int, numbers []int) rowRes {

    result := make([]int, len(row))

    for i, e := range row {

        result[i] = e + numbers[i]

    }

    return rowRes{

        index:  index,

        result: &result,

    }

}


func main() {

    numbers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}


    matrix := [][]int{

        {1, 0, 0, 0, 0, 0, 0, 0, 0, 0},

        {0, 1, 0, 0, 0, 0, 0, 0, 0, 0},

        {0, 0, 1, 0, 0, 0, 0, 0, 0, 0},

        {0, 0, 0, 1, 0, 0, 0, 0, 0, 0},

        {0, 0, 0, 0, 1, 0, 0, 0, 0, 0},

        {0, 0, 0, 0, 0, 1, 0, 0, 0, 0},

        {0, 0, 0, 0, 0, 0, 1, 0, 0, 0},

        {0, 0, 0, 0, 0, 0, 0, 1, 0, 0},

        {0, 0, 0, 0, 0, 0, 0, 0, 1, 0},

        {0, 0, 0, 0, 0, 0, 0, 0, 0, 1},

    }

    rmatrix := make([][]int, 10)


    // Buffered channel

    rowChan := make(chan rowRes, 10)


    wg := sync.WaitGroup{}


    // Reciever goroutine

    go recv(rowChan, rmatrix)


    for i := range matrix {

        wg.Add(1)

        go func(index int, row []int, w *sync.WaitGroup) {

            rowChan <- addRow(index, row, numbers)

            w.Done()

        }(i, matrix[i], &wg)

    }

    wg.Wait()

    close(rowChan)

    fmt.Println(rmatrix)

}


func recv(res chan rowRes, rmatrix [][]int) {

    for {

        select {

        case k, ok := <-res:

            if !ok {

                return

            }

            rmatrix[k.index] = *k.result

        }

    }

}



查看完整回答
反对 回复 2022-06-01
?
杨魅力

TA贡献1811条经验 获得超6个赞

流水线法



taskChannel := make(chan string,1000); // Set up the task queue

wg := sync.WaitGroup


// Task release

wg.add(1)

go func(&wg,taskChannel) {

      defer wg.Down()

      for i in "task list" {

        taskChannel <- "Stuff the characters you want to deal with here"

      }


    // After the task is sent and closed

    close(taskChannel)

}(wg *sync.WaitGroup,taskChannel chan string)


// Task execution

go func(&wg,taskChannel,1000) {

    defer wg.Down()

    limit := make(chan bool,limitNumber); // Limit the number of concurrent

    tg := sync.WaitGroup

    loop:

    for {

      select {

      case task,over := <-taskChannel:

            if !over {  // If there are no more tasks, quit

                tg.Wait()  // Wait for all tasks to end

                break loop

            }


            tg.Add(1)

            limit<-true

            go func(&tg,limitm) {

                defer func() {

                    <-limit

                    tg.Down()

                }

                // Business processing logic, processing tasks

            }(tg *sync.WaitGroup,limit chan bool,task string)

      }

    }

}(wg *sync.WaitGroup,taskChannel chan string,limitNumber int)


wg.Wait()

希望能帮到你


查看完整回答
反对 回复 2022-06-01
  • 3 回答
  • 0 关注
  • 103 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号