为了账号安全,请及时绑定邮箱和手机立即绑定

如何使用 goroutine 池

如何使用 goroutine 池

Go
红糖糍粑 2021-06-21 09:05:04
我想使用 Go 从雅虎财经下载股票价格电子表格。我将在自己的 goroutine 中为每只股票发出 http 请求。我有一个大约 2500 个符号的列表,但与其并行发出 2500 个请求,我更喜欢一次发出 250 个请求。在 Java 中,我会创建一个线程池并在线程空闲时重用它们。我试图找到类似的东西,一个 goroutine 池,如果你愿意的话,但找不到任何资源。如果有人能告诉我如何完成手头的任务或为我指出相同的资源,我将不胜感激。谢谢!
查看完整描述

3 回答

?
DIEA

TA贡献1820条经验 获得超2个赞

我想,最简单的方法是创建 250 个 goroutine 并将它们传递给一个通道,您可以使用该通道将链接从主 goroutine 传递到子 goroutine,并监听该通道。


当所有链接都传递给 goroutine 时,您关闭一个通道,所有 goroutine 就完成了它们的工作。


为了在孩子处理数据之前完成主 goroutine 的安全,您可以使用sync.WaitGroup.


下面是一些代码来说明我上面所说的(不是最终的工作版本,而是说明了这一点):


func worker(linkChan chan string, wg *sync.WaitGroup) {

   // Decreasing internal counter for wait-group as soon as goroutine finishes

   defer wg.Done()


   for url := range linkChan {

     // Analyze value and do the job here

   }

}


func main() {

    lCh := make(chan string)

    wg := new(sync.WaitGroup)


    // Adding routines to workgroup and running then

    for i := 0; i < 250; i++ {

        wg.Add(1)

        go worker(lCh, wg)

    }


    // Processing all links by spreading them to `free` goroutines

    for _, link := range yourLinksSlice {

        lCh <- link

    }


    // Closing channel (waiting in goroutines won't continue any more)

    close(lCh)


    // Waiting for all goroutines to finish (otherwise they die as main routine dies)

    wg.Wait()

}


查看完整回答
反对 回复 2021-06-28
?
潇湘沐

TA贡献1816条经验 获得超6个赞

你可以使用Go这个git repo 中的线程池实现库


这是关于如何使用通道作为线程池的好博客


来自博客的片段


    var (

 MaxWorker = os.Getenv("MAX_WORKERS")

 MaxQueue  = os.Getenv("MAX_QUEUE")

)


//Job represents the job to be run

type Job struct {

    Payload Payload

}


// A buffered channel that we can send work requests on.

var JobQueue chan Job


// Worker represents the worker that executes the job

type Worker struct {

    WorkerPool  chan chan Job

    JobChannel  chan Job

    quit        chan bool

}


func NewWorker(workerPool chan chan Job) Worker {

    return Worker{

        WorkerPool: workerPool,

        JobChannel: make(chan Job),

        quit:       make(chan bool)}

}


// Start method starts the run loop for the worker, listening for a quit channel in

// case we need to stop it

func (w Worker) Start() {

    go func() {

        for {

            // register the current worker into the worker queue.

            w.WorkerPool <- w.JobChannel


            select {

            case job := <-w.JobChannel:

                // we have received a work request.

                if err := job.Payload.UploadToS3(); err != nil {

                    log.Errorf("Error uploading to S3: %s", err.Error())

                }


            case <-w.quit:

                // we have received a signal to stop

                return

            }

        }

    }()

}


// Stop signals the worker to stop listening for work requests.

func (w Worker) Stop() {

    go func() {

        w.quit <- true

    }()


查看完整回答
反对 回复 2021-06-28
  • 3 回答
  • 0 关注
  • 195 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信