为了账号安全,请及时绑定邮箱和手机立即绑定

始终有 x 个 goroutine 随时运行

始终有 x 个 goroutine 随时运行

Go
有只小跳蛙 2021-08-10 15:15:25
我看到很多关于如何让 Go 等待 x 个 goroutines 完成的教程和示例,但我想做的是确保总是有 x 个 goroutine 在运行,所以一旦一个 goroutine 结束就会启动一个新的 goroutine .具体来说,我有几十万个“要做的事情”,它们正在处理一些来自 MySQL 的东西。所以它是这样工作的:db, err := sql.Open("mysql", connection_string)checkErr(err)defer db.Close()rows,err := db.Query(`SELECT id FROM table`)checkErr(err)defer rows.Close()var id uintfor rows.Next() {    err := rows.Scan(&id)    checkErr(err)    go processTheThing(id)    }checkErr(err)rows.Close()目前,将推出数十万线程processTheThing()。我需要的是最多启动 x 个(我们称之为 20 个)goroutines。因此,它首先为前 20 行启动 20 个,从那时起,它将在当前 goroutine 之一完成时为下一个 id 启动一个新的 goroutine。所以在任何时间点总是有 20 个在运行。我敢肯定,这是很简单/标准,但我似乎无法找到任何教程或例子或如何做到这一点的一个很好的解释。
查看完整描述

3 回答

?
郎朗坤

TA贡献1921条经验 获得超9个赞

您可能会发现Go Concurrency Patterns文章很有趣,尤其是有界并行性部分,它解释了您需要的确切模式。


您可以使用空结构的通道作为限制保护来控制并发工作程序 goroutine 的数量:


package main


import "fmt"


func main() {

    maxGoroutines := 10

    guard := make(chan struct{}, maxGoroutines)


    for i := 0; i < 30; i++ {

        guard <- struct{}{} // would block if guard channel is already filled

        go func(n int) {

            worker(n)

            <-guard

        }(i)

    }

}


func worker(i int) { fmt.Println("doing work on", i) }


查看完整回答
反对 回复 2021-08-10
?
ABOUTYOU

TA贡献1812条经验 获得超5个赞

在这里,我认为像这样简单的事情会起作用:


package main


import "fmt"


const MAX = 20


func main() {

    sem := make(chan int, MAX)

    for {

        sem <- 1 // will block if there is MAX ints in sem

        go func() {

            fmt.Println("hello again, world")

            <-sem // removes an int from sem, allowing another to proceed

        }()

    }

}


查看完整回答
反对 回复 2021-08-10
?
慕姐8265434

TA贡献1813条经验 获得超2个赞

感谢大家帮助我解决这个问题。但是,我认为没有人真正提供既有效又简单/易懂的东西,尽管你们都帮助我理解了该技术。


我最后所做的是我认为作为对我的具体问题的答案更容易理解和实用,所以我会在这里发布,以防其他人有同样的问题。


不知何故,这最终看起来很像 OneOfOne 发布的内容,这很棒,因为现在我明白了。但是 OneOfOne 的代码一开始我发现很难理解,因为将函数传递给函数使得理解什么是什么非常令人困惑。我认为这种方式更有意义:


package main


import (

"fmt"

"sync"

)


const xthreads = 5 // Total number of threads to use, excluding the main() thread


func doSomething(a int) {

    fmt.Println("My job is",a)

    return

}


func main() {

    var ch = make(chan int, 50) // This number 50 can be anything as long as it's larger than xthreads

    var wg sync.WaitGroup


    // This starts xthreads number of goroutines that wait for something to do

    wg.Add(xthreads)

    for i:=0; i<xthreads; i++ {

        go func() {

            for {

                a, ok := <-ch

                if !ok { // if there is nothing to do and the channel has been closed then end the goroutine

                    wg.Done()

                    return

                }

                doSomething(a) // do the thing

            }

        }()

    }


    // Now the jobs can be added to the channel, which is used as a queue

    for i:=0; i<50; i++ {

        ch <- i // add i to the queue

    }


    close(ch) // This tells the goroutines there's nothing else to do

    wg.Wait() // Wait for the threads to finish

}


查看完整回答
反对 回复 2021-08-10
  • 3 回答
  • 0 关注
  • 205 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信