为了账号安全,请及时绑定邮箱和手机立即绑定

Goroutine-停止运行进程

Goroutine-停止运行进程

Go
慕码人8056858 2022-07-11 15:56:58
我使用以下代码在大多数情况下都可以正常工作,以防我们使用一些长时间运行的进程,它不会在程序内部停止并不会结束(这里我限制为 60 秒的示例)我希望每项工作在5 秒后终止(即使没有完成工作也终止进程),我怎样才能在 不 改变功能的情况下做到这一点myLongRunningFunc。我知道这不是直接解决它,我可以使用任何技巧吗?这是一些最小的可重现示例https://play.golang.org/p/a0RWY4bYWMtpackage mainimport (    "context"    "errors"    "fmt"    "time"    "github.com/gammazero/workerpool")func main() {    // here define a timeout for 5 sec,    // the task should be terminate after 5 sec    ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)    defer cancel()    runner := newRunner(ctx, 10)    runner.do(job{        Name: "a",        Task: func() jobResult {            select {            case <-ctx.Done():                return jobResult{Error: errors.New("Timedout, exiting")}            default:                myLongRunningFunc("A job")            }            return jobResult{Data: "from a"}        },    })    runner.do(job{        Name: "b",        Task: func() jobResult {            select {            case <-ctx.Done():                return jobResult{Error: errors.New("Timeouts, exiting")}            default:                myLongRunningFunc("B job")            }            return jobResult{Data: "from b"}        },    })    results := runner.getjobResults()    fmt.Println(results)    time.Sleep(time.Second * 60)}func myLongRunningFunc(name string) {    for i := 0; i < 100000; i++ {        time.Sleep(time.Second * 1)        msg := "job" + name + " running..\n"        fmt.Println(msg)    }}type runner struct {    *workerpool.WorkerPool    ctx     context.Context    kill    chan struct{}    result  chan jobResult    results []jobResult}func (r *runner) processResults() {    for {        select {        case res, ok := <-r.result:            if !ok {                goto Done            }            r.results = append(r.results, res)        }    }Done:    <-r.kill}当我使用圣坛频道时,编辑不相关
查看完整描述

2 回答

?
慕少森

TA贡献2019条经验 获得超9个赞

我希望每个作业在 5 秒后终止(即使它没有完成工作也终止进程),我怎样才能在不更改函数 myLongRunningFunc 的情况下做到这一点。


然后你只需添加一个 5 秒的服务员然后退出。


package main


import (

    "context"

    "errors"

    "fmt"

    "time"


    "github.com/gammazero/workerpool"

)


func main() {

    go func() {

        // here define a timeout for 5 sec,

        // the task should be terminate after 5 sec

        ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)

        defer cancel()


        runner := newRunner(ctx, 10)


        runner.do(job{

            Name: "a",

            Task: func() jobResult {

                select {

                case <-ctx.Done():

                    return jobResult{Error: errors.New("Timedout, exiting")}

                default:

                    myLongRunningFunc("A job")

                }

                return jobResult{Data: "from a"}

            },

        })


        runner.do(job{

            Name: "b",

            Task: func() jobResult {

                select {

                case <-ctx.Done():

                    return jobResult{Error: errors.New("Timeouts, exiting")}

                default:

                    myLongRunningFunc("B job")

                }


                return jobResult{Data: "from b"}

            },

        })


        results := runner.getjobResults()

        fmt.Println(results)

        time.Sleep(time.Second * 60)

    }()

    <-time.After(time.Second * 5)

}


func myLongRunningFunc(name string) {

    for i := 0; i < 100000; i++ {

        time.Sleep(time.Second * 1)

        msg := "job" + name + " running..\n"

        fmt.Println(msg)

    }

}


type runner struct {

    *workerpool.WorkerPool

    ctx     context.Context

    kill    chan struct{}

    result  chan jobResult

    results []jobResult

}


func (r *runner) processResults() {

    for {

        select {

        case res, ok := <-r.result:

            if !ok {

                goto Done

            }

            r.results = append(r.results, res)

        }

    }

Done:

    <-r.kill

}


func newRunner(ctx context.Context, numRunners int) *runner {

    r := &runner{

        WorkerPool: workerpool.New(numRunners),

        ctx:        ctx,

        kill:       make(chan struct{}),

        result:     make(chan jobResult),

    }

    go r.processResults()

    return r

}


func (r *runner) do(j job) {

    r.Submit(r.wrap(&j))

}


func (r *runner) getjobResults() []jobResult {

    r.StopWait()

    close(r.result)

    r.kill <- struct{}{}

    return r.results

}


func (r *runner) wrap(job *job) func() {

    return func() {

        job.result = make(chan jobResult)

        go job.Run()

        select {

        case res := <-job.result:

            r.result <- res

        case <-r.ctx.Done():

            fmt.Printf("Job '%s' should stop here\n", job.Name)

            r.result <- jobResult{name: job.Name, Error: r.ctx.Err()}

        }

    }

}


type job struct {

    Name    string

    Task    func() jobResult

    Context context.Context

    result  chan jobResult

    stopped chan struct{}

    done    context.CancelFunc

}


func (j *job) Run() {

    result := j.Task()

    result.name = j.Name

    j.result <- result

}


type jobResult struct {

    name  string

    Error error

    Data  interface{}

}


查看完整回答
反对 回复 2022-07-11
?
海绵宝宝撒

TA贡献1809条经验 获得超8个赞

我认为不可能从外部 goroutine 中停止 goroutine。您可以检查它是否超时,但是,您不能停止它。

您可以做的是通过通道向 goroutine 发送消息,在这种情况下可以对其进行监视和停止。

你可以在这里找到例子


查看完整回答
反对 回复 2022-07-11
  • 2 回答
  • 0 关注
  • 147 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信