为了账号安全,请及时绑定邮箱和手机立即绑定

是否有一些优雅的方式来暂停和恢复任何其他 goroutine?

是否有一些优雅的方式来暂停和恢复任何其他 goroutine?

Go
一只斗牛犬 2021-06-02 21:42:09
就我而言,我有数千个 goroutine 同时作为work(). 我也有一个sync()goroutine。当sync启动时,我需要任何其他的goroutine同步作业完成后暂停了一段时间。这是我的代码:var channels []chan intvar channels_mutex sync.Mutexfunc work() {  channel := make(chan int, 1)  channels_mutex.Lock()    channels = append(channels, channel)  channels_mutex.Unlock()  for {    for {      sync_stat := <- channel // blocked here      if sync_stat == 0 { // if sync complete        break        }    }    // Do some jobs    if (some condition) {      return    }  }}func sync() {  channels_mutex.Lock()  // do some sync  for int i := 0; i != len(channels); i++ {    channels[i] <- 0  }  channels_mutex.Unlock()}现在的问题是,由于<-总是在读取时阻塞,所以每次都sync_stat := <- channel阻塞。我知道如果通道关闭它不会被阻塞,但是因为我必须使用这个通道直到work()退出,而且我没有找到任何方法来重新打开一个关闭的通道。我怀疑自己走错了路,因此感谢您的帮助。是否有一些“优雅”的方式来暂停和恢复任何其他 goroutine?
查看完整描述

1 回答

?
不负相思意

TA贡献1777条经验 获得超10个赞

如果我理解正确,您需要 N 个工人和一个控制器,可以随意暂停、恢复和停止工人。下面的代码将做到这一点。


package main


import (

    "fmt"

    "runtime"

    "sync"

)


// Possible worker states.

const (

    Stopped = 0

    Paused  = 1

    Running = 2

)


// Maximum number of workers.

const WorkerCount = 1000


func main() {

    // Launch workers.

    var wg sync.WaitGroup

    wg.Add(WorkerCount + 1)


    workers := make([]chan int, WorkerCount)

    for i := range workers {

        workers[i] = make(chan int, 1)


        go func(i int) {

            worker(i, workers[i])

            wg.Done()

        }(i)

    }


    // Launch controller routine.

    go func() {

        controller(workers)

        wg.Done()

    }()


    // Wait for all goroutines to finish.

    wg.Wait()

}


func worker(id int, ws <-chan int) {

    state := Paused // Begin in the paused state.


    for {

        select {

        case state = <-ws:

            switch state {

            case Stopped:

                fmt.Printf("Worker %d: Stopped\n", id)

                return

            case Running:

                fmt.Printf("Worker %d: Running\n", id)

            case Paused:

                fmt.Printf("Worker %d: Paused\n", id)

            }


        default:

            // We use runtime.Gosched() to prevent a deadlock in this case.

            // It will not be needed of work is performed here which yields

            // to the scheduler.

            runtime.Gosched()


            if state == Paused {

                break

            }


            // Do actual work here.

        }

    }

}


// controller handles the current state of all workers. They can be

// instructed to be either running, paused or stopped entirely.

func controller(workers []chan int) {

    // Start workers

    setState(workers, Running)


    // Pause workers.

    setState(workers, Paused)


    // Unpause workers.

    setState(workers, Running)


    // Shutdown workers.

    setState(workers, Stopped)

}


// setState changes the state of all given workers.

func setState(workers []chan int, state int) {

    for _, w := range workers {

        w <- state

    }

}


查看完整回答
1 反对 回复 2021-06-07
  • 1 回答
  • 0 关注
  • 204 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信