为了账号安全,请及时绑定邮箱和手机立即绑定

使用 goroutines 和 context 创建可取消的 worker

使用 goroutines 和 context 创建可取消的 worker

Go
海绵宝宝撒 2022-11-28 16:58:47
我试图了解如何正确使用 goroutines 以及通道和上下文,以创建可取消的后台工作者。我熟悉使用在显式调用时可以取消的上下文,将它附加到 worker goroutine 应该可以让我停止 worker。但我无法弄清楚如何使用它来实现这一目标。下面的示例说明了一个从通道“urls”获取数据的 worker goroutine,它还带有一个可取消的上下文。//worker.gofunc Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {    fmt.Printf("Worker %d is starting\n", id)    select {    // placeholder for a channel writing the data from the URL    case url := <-urls:        fmt.Printf("Worker :%d received url :%s\n", id, url)    // checking if the process is cancelled    case <-ctx.Done():        fmt.Printf("Worker :%d exitting..\n", id)    }    fmt.Printf("Worker :%d done..\n", id)    wg.Done()}这对我不起作用有两个原因,对于无缓冲的通道,在没有 goroutines 读取的情况下写入它会阻塞它,所以一旦有更多数据添加到 urls 通道,发送方就会阻塞。一旦两个通道中的任何一个返回,它就会立即返回。我还尝试将选择包装在一个无限循环中,但在上下文引发错误后添加一个中断。func Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {    fmt.Printf("Worker %d is starting\n", id)    for {        select {        // placeholder for a channel writing the data from the URL        case url := <-urls:            fmt.Printf("Worker :%d received url :%s\n", id, url)        // checking if the process is cancelled        case <-ctx.Done():            fmt.Printf("Worker :%d exitting..\n", id)            break // raises error :ineffective break statement. Did you mean to break out of the outer loop? (SA4011)go-staticcheck        }    }    fmt.Printf("Worker :%d done..\n", id) // code is unreachable    wg.Done()}实施这样的事情的正确方法是什么?PS:有关设计此类工作进程的任何资源/参考资料也将有所帮助。
查看完整描述

2 回答

?
人到中年有点甜

TA贡献1895条经验 获得超7个赞

您可以用 return 代替 break,代码将起作用。

但是,更好的方法可能是:

  1. Worker 在 for / range 循环中消费通道

  2. 生产者应负责检测取消并关闭通道。for 循环将停止级联


查看完整回答
反对 回复 2022-11-28
?
不负相思意

TA贡献1777条经验 获得超10个赞

我专门为此做了一个 Go 包。你可以在这里找到它:https ://github.com/MicahParks/ctxerrpool

这是项目的示例README.md

package main


import (

    "bytes"

    "context"

    "log"

    "net/http"

    "os"

    "time"


    "github.com/MicahParks/ctxerrpool"

)


func main() {


    // Create an error handler that logs all errors.

    var errorHandler ctxerrpool.ErrorHandler

    errorHandler = func(pool ctxerrpool.Pool, err error) {

        log.Printf("An error occurred. Error: \"%s\".\n", err.Error())

    }


    // Create a worker pool with 4 workers.

    pool := ctxerrpool.New(4, errorHandler)


    // Create some variables to inherit through a closure.

    httpClient := &http.Client{}

    u := "https://golang.org"

    logger := log.New(os.Stdout, "status codes: ", 0)


    // Create the worker function.

    var work ctxerrpool.Work

    work = func(ctx context.Context) (err error) {


        // Create the HTTP request.

        var req *http.Request

        if req, err = http.NewRequestWithContext(ctx, http.MethodGet, u, bytes.NewReader(nil)); err != nil {

            return err

        }


        // Do the HTTP request.

        var resp *http.Response

        if resp, err = httpClient.Do(req); err != nil {

            return err

        }


        // Log the status code.

        logger.Println(resp.StatusCode)


        return nil

    }


    // Do the work 16 times.

    for i := 0; i < 16; i++ {


        // Create a context for the work.

        ctx, cancel := context.WithTimeout(context.Background(), time.Second)

        defer cancel()


        // Send the work to the pool.

        pool.AddWorkItem(ctx, work)

    }


    // Wait for the pool to finish.

    pool.Wait()

}


查看完整回答
反对 回复 2022-11-28
  • 2 回答
  • 0 关注
  • 193 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信