为了账号安全,请及时绑定邮箱和手机立即绑定

Golang:在“case”语句中为多个工作人员消费项目

Golang:在“case”语句中为多个工作人员消费项目

Go
开满天机 2022-10-10 18:00:37
我的消费者(从 运行)支持上下文取消和通过语句main从通道读取。case我可以使用上下文关闭消费者,效果很好。但是,当我在一个案例语句中生成多个工作人员时,每个工作人员都会从 获得相同的工作(消息)jobsChan,这不是我想要的:func (app *App) consumer() {    for {        select {        case <-app.ctx.Done():            app.infoLog.Print("Caught SIGINT, stopping.")            app.wg.Wait()            app.doneChan <- struct{}{} # main uses this channel to block itself until all goroutines are stopped            app.infoLog.Print("Shutting down the consumer...")            return        case job := <-app.jobsChan:            // PROBLEM here: wrong, each worker is given the same job            for workerNumber := 0; workerNumber < app.config.workers; workerNumber++ {                app.wg.Add(1)                go app.workerFunc(workerNumber, job)            }        }    }}func (app *App) workerFunc(id int, job Job) {    defer app.wg.Done()        ... actual worker code here ...}如何重写此代码以便我可以保留select频道app.ctx.Done并同时生成工人,以便每个工人从频道中选择下一条消息作为作业?我需要继续for/select监听ctx取消,但同时我需要生成 X 工作人员来读取来自jobsChan消费者的消息。这可能吗?想到的唯一选择是将 channel 直接传递给 spawnedworkerFunc并for job := range app.jobsChan在workerFunc. 但随后case job := <-app.jobsChan:消费者的整体变得毫无意义,我不知道如何重写它。澄清一下:当我运行应用程序时,我希望每个工作人员都有一个新的工作 id 从jobsChan- 但他们都处理相同的,例如 1,然后他们都处理下一个,例如 2#wrongWorker 0: start processing item 1Worker 2: start processing item 1Worker 1: start processing item 1
查看完整描述

1 回答

?
米脂

TA贡献1836条经验 获得超3个赞

您现有的代码明确地将相同的工作分配给所有工作人员。如果您有固定数量的工作人员,请为他们创建 goroutine(在初始化期间),并让他们收听频道:


for workerNumber:0;workerNumber<app.config.workers;workerNumber++ {

   go app.workerFunc(ctx,workerNumber,app.jobsChan)

}

在每个工作人员中,只需检查 jobQueue 和上下文取消。


换句话说,您不需要consumer, 将工作直接传递给工人。


查看完整回答
反对 回复 2022-10-10
  • 1 回答
  • 0 关注
  • 70 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信