为了账号安全,请及时绑定邮箱和手机立即绑定

如何在执行一组工作人员之间正确延迟

如何在执行一组工作人员之间正确延迟

Go
不负相思意 2022-10-17 19:41:03
美好的一天,我正在尝试在工作人员执行之间实现正确的延迟,例如,工作人员需要完成 30 个任务并进入睡眠状态 5 秒,我如何在代码中跟踪已经完成了30 个任务只有在那之后才能睡5秒钟?下面是创建一个由30 个工作人员组成的池的代码,这些工作人员依次以无序的方式一次执行 30 个任务,这是代码:import (    "fmt"    "math/rand"    "sync"    "time")type Job struct {    id       int    randomno int}type Result struct {    job         Job    sumofdigits int}var jobs = make(chan Job, 10)var results = make(chan Result, 10)func digits(number int) int {    sum := 0    no := number    for no != 0 {        digit := no % 10        sum += digit        no /= 10    }    time.Sleep(2 * time.Second)    return sum}func worker(wg *sync.WaitGroup) {    for job := range jobs {        output := Result{job, digits(job.randomno)}        results <- output    }    wg.Done()}func createWorkerPool(noOfWorkers int) {    var wg sync.WaitGroup    for i := 0; i < noOfWorkers; i++ {        wg.Add(1)        go worker(&wg)    }    wg.Wait()    close(results)}func allocate(noOfJobs int) {    for i := 0; i < noOfJobs; i++ {        if i != 0 && i%30 == 0 {            fmt.Printf("SLEEPAGE 5 sec...")            time.Sleep(10 * time.Second)        }        randomno := rand.Intn(999)        job := Job{i, randomno}        jobs <- job    }    close(jobs)}func result(done chan bool) {    for result := range results {        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)    }    done <- true}func main() {    startTime := time.Now()    noOfJobs := 100    go allocate(noOfJobs)    done := make(chan bool)    go result(done)    noOfWorkers := 30    createWorkerPool(noOfWorkers)    <-done    endTime := time.Now()    diff := endTime.Sub(startTime)    fmt.Println("total time taken ", diff.Seconds(), "seconds")}播放:https ://go.dev/play/p/lehl7hoo-kp目前尚不清楚如何确保完成 30 个任务以及在哪里插入延迟,我将不胜感激
查看完整描述

1 回答

?
慕姐8265434

TA贡献1813条经验 获得超2个赞

好的,让我们从这个工作示例开始:


func Test_t(t *testing.T) {


    // just a published, this publishes result on a chan

    publish := func(s int, ch chan int, wg *sync.WaitGroup) {

        ch <- s // this is blocking!!!

        wg.Done()

    }


    wg := &sync.WaitGroup{}

    wg.Add(100)


    // we'll use done channel to notify the work is done

    res := make(chan int)

    done := make(chan struct{})

    // create worker that will notify that all results were published

    go func() {

        wg.Wait()

        done <- struct{}{}

    }()

    

    // let's create a jobs that publish on our res chan

    // please note all goroutines are created immediately

    for i := 0; i < 100; i++ {

        go publish(i, res, wg)

    }


    // lets get 30 args and then wait

    var resCounter int

forloop:

    for {

        select {

        case ss := <-res:

            println(ss)

            resCounter += 1

            // break the loop

            if resCounter%30 == 0 {

                // after receiving 30 results we are blocking this thread

                // no more results will be taken from the channel for 5 seconds

                println("received 30 results, waiting...")

                time.Sleep(5 * time.Second)

            }

        case <-done:

            // we are done here, let's break this infinite loop

            break forloop

        }

    }

}

我希望这进一步表明它是如何完成的。


那么,您的代码有什么问题?老实说,它看起来不错(我的意思是发布了 30 个结果,然后代码等待,然后是另外 30 个结果,等等),但问题是您想在哪里等待?

我猜有几种可能:

  • 创建工人(这就是您的代码现在的工作方式,正如我所见,它以 30 个包发布作业;请注意,您在digit函数中的 2 秒延迟仅适用于执行代码的 goroutine)

  • 触发工人(所以“等待”代码应该在工人函数中,不允许运行更多的工人 - 所以它必须观察发布了多少结果)

  • 处理结果(这就是我的代码的工作方式,并且正确的同步在forloop


查看完整回答
反对 回复 2022-10-17
  • 1 回答
  • 0 关注
  • 92 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信