为了账号安全,请及时绑定邮箱和手机立即绑定

Go并发循环逻辑

Go并发循环逻辑

Go
吃鸡游戏 2022-10-17 09:58:11
我刚刚开始使用 Go 并发并尝试创建一个调度 go 例程,该例程会将作业发送到在 jobchan 通道上侦听的工作池。如果一条消息通过 dispatchchan 通道进入我的调度函数并且我的其他 go 例程正忙,则该消息将附加到调度程序中的堆栈切片上,并且调度程序将在稍后有工作人员可用时尝试再次发送,和/或没有在 dispatchchan 上收到更多消息。这是因为 dispatchchan 和 jobchan 没有缓冲,并且工作人员正在运行的 go 例程会将其他消息附加到调度程序直到某个点,我不希望工作人员阻塞等待调度程序并造成死锁。这是我到目前为止提出的调度程序代码:func dispatch() {var stack []stringacount := 0for {    select {    case d := <-dispatchchan:        stack = append(stack, d)    case c := <-mw:        acount = acount + c    case jobchan <-stack[0]:        if len(stack) > 1 {            stack[0] = stack[len(stack)-1]            stack = stack[:len(stack)-1]        } else {            stack = nil        }    default:        if acount == 0 && len(stack) == 0 {            close(jobchan)            close(dispatchchan)            close(mw)            wg.Done()            return        }    }}完整示例位于https://play.golang.wiki/p/X6kXVNUn5N7mw 通道是一个缓冲通道,长度与 worker go 例程的数量相同。它充当工作池的信号量。如果工作程序正在执行 [m] 有意义的 [w]ork,它会在 mw 通道上抛出 int 1,当它完成工作并返回 for 循环监听 jobchan 时,它会在 mw 上抛出 int -1。这样,调度程序就知道工作池是否正在完成任何工作,或者该池是否处于空闲状态。如果池空闲并且堆栈上没有更多消息,则调度程序关闭通道并将控制权返回给主函数。这一切都很好,但我遇到的问题是堆栈本身的长度可能为零,所以在我尝试将堆栈 [0] 发送到 jobchan 的情况下,如果堆栈为空,则会出现越界错误。我想弄清楚的是如何确保当我遇到这种情况时,stack[0] 是否有一个值。我不希望这种情况向 jobchan 发送一个空字符串。任何帮助是极大的赞赏。如果有我应该考虑的更惯用的并发模式,我很想听听。我不是 100% 相信这个解决方案,但这是迄今为止我所获得的最远距离。
查看完整描述

1 回答

?
慕村225694

TA贡献1880条经验 获得超4个赞

这一切都很好,但我遇到的问题是堆栈本身的长度可能为零,所以在我尝试将堆栈 [0] 发送到 jobchan 的情况下,如果堆栈为空,则会出现越界错误。

我无法使用您的游乐场链接重现它,但它是可信的,因为至少有一名gofunc工作人员可能已经准备好在该频道上接收。

我的输出是Msgcnt: 0,这也很容易解释,因为在运行它时gofunc可能还没有准备好接收。这些操作的顺序没有定义。jobschandispatch()select

尝试创建一个调度 go 例程,将作业发送到在 jobchan 频道上监听的工作池

通道不需要调度程序。通道调度程序。

如果一条消息通过 dispatchchan 通道进入我的调度函数并且我的其他 go 例程正忙,则消息是 [...] 将 [...] 稍后当工作人员可用时再次发送,[...] 或否在 dispatchchan 上收到更多消息。

通过一些创造性的编辑,很容易将其变成接近缓冲通道定义的东西。它可以立即读取,可以占用一些limit无法立即发送的消息。你确实定义limit了,尽管它没有在你的代码中的其他地方使用。

在任何函数中,定义一个您不读取的变量都会导致编译时错误,例如limit declared but not used. 这种限制提高了代码质量并有助于识别类型。但是在包范围内,您已经摆脱了将未使用的定义limit为“全局”,从而避免了一个有用的错误——您没有限制任何东西。

不要使用全局变量。使用传递的参数来定义作用域,因为作用域的定义相当于用go关键字表达的功能并发。 将本地范围内定义的相关通道传递给包范围内定义的函数,以便您轻松跟踪它们的关系。并使用定向渠道来强制执行您的功能之间的生产者/消费者关系。稍后再谈。

回到“限制”,限制您排队的作业数量是有意义的,因为所有资源都是有限的,并且接受比您对处理的任何预期更多的消息需要比进程内存提供的更持久的存储。如果你觉得无论如何都没有义务满足这些要求那么一开始就不要接受“太多”的要求。

那么,什么功能有dispatchchandispatch()?在处理之前存储有限数量的待处理请求(如果有的话),然后将它们发送给下一个可用的工作人员?这正是缓冲通道的用途。

循环逻辑

谁“知道”你的程序何时完成? main()提供初始输入,但在 `dispatch() 中关闭所有 3 个通道:

            close(jobchan)       
            close(dispatchchan)       
            close(mw)

您的工作人员写入自己的作业队列,因此只有在工作人员完成写入后才能关闭传入的作业队列。但是,个别工作人员也不知道何时关闭作业队列,因为其他工作人员正在写入它。 没有人知道你的算法何时完成。这就是你的循环逻辑。

mw 通道是一个缓冲通道,长度与 worker go 例程的数量相同。它充当工作池的信号量。

这里有一个竞争条件。考虑所有n工人刚刚收到最后n一份工作的情况。他们每个人都读取jobschan并检查okdisptatcher继续运行它的select. 没有人现在正在写信dispatchchan或阅读,jobschan因此default案件会立即匹配。 len(stack)0并且没有电流job,因此dispatcher关闭所有频道,包括mw. 此后的某个时候,一名工作人员试图写入一个关闭的通道并出现恐慌。

所以最后我准备提供一些代码,但我还有一个问题:我没有一个明确的问题陈述来编写代码。

我刚刚开始使用 Go 并发并尝试创建一个调度 go 例程,该例程会将作业发送到在 jobchan 通道上侦听的工作池。

goroutine 之间的通道就像同步齿轮的齿。但是齿轮会转动到什么地方呢?你不是想保持时间,也不是制造发条玩具。你的齿轮可以转动,但成功会是什么样子?他们的转身?

让我们尝试为通道定义一个更具体的用例:给定一组任意长的持续时间作为标准输入*上的字符串,在其中一个工作n人员中休眠那么多秒。所以我们实际上有一个结果要返回,我们会说每个工作人员将返回运行持续时间的开始和结束时间。

  • 为了让它可以在操场上运行,我将使用硬编码的字节缓冲区来模拟标准输入。

package main


import (

    "bufio"

    "bytes"

    "fmt"

    "os"

    "strings"

    "sync"

    "time"

)


type SleepResult struct {

    worker_id int

    duration  time.Duration

    start     time.Time

    end       time.Time

}


func main() {

    var num_workers = 2

    workchan := make(chan time.Duration)

    resultschan := make(chan SleepResult)

    var wg sync.WaitGroup

    var resultswg sync.WaitGroup

    resultswg.Add(1)

    go results(&resultswg, resultschan)

    for i := 0; i < num_workers; i++ {

        wg.Add(1)

        go worker(i, &wg, workchan, resultschan)

    }

    // playground doesn't have stdin

    var input = bytes.NewBufferString(

        strings.Join([]string{

            "3ms",

            "1 seconds",

            "3600ms",

            "300 ms",

            "5s",

            "0.05min"}, "\n") + "\n")


    var scanner = bufio.NewScanner(input)

    for scanner.Scan() {

        text := scanner.Text()

        if dur, err := time.ParseDuration(text); err != nil {

            fmt.Fprintln(os.Stderr, "Invalid duration", text)

        } else {

            workchan <- dur

        }

    }

    close(workchan) // we know when our inputs are done

    wg.Wait()       // and when our jobs are done

    close(resultschan)

    resultswg.Wait()

}


func results(wg *sync.WaitGroup, resultschan <-chan SleepResult) {

    for res := range resultschan {

        fmt.Printf("Worker %d: %s : %s => %s\n",

            res.worker_id, res.duration,

            res.start.Format(time.RFC3339Nano), res.end.Format(time.RFC3339Nano))

    }

    wg.Done()

}


func worker(id int, wg *sync.WaitGroup, jobchan <-chan time.Duration, resultschan chan<- SleepResult) {

    var res = SleepResult{worker_id: id}

    for dur := range jobchan {

        res.duration = dur

        res.start = time.Now()

        time.Sleep(res.duration)

        res.end = time.Now()

        resultschan <- res

    }

    wg.Done()

}

在这里,我使用了 2 个等待组,一个用于工人,一个用于结果。这确保我在main()结束之前完成了所有结果的编写。我通过让每个函数一次只做一件事来保持我的函数简单:main 读取输入,从中解析持续时间,然后将它们发送给下一个 worker。该results函数收集结果并将它们打印到标准输出。工人负责睡眠、读取jobchan和写入resultschan

workchan可以缓冲(或不缓冲,如本例所示);没关系,因为输入将以可以处理的速度读取。我们可以缓冲尽可能多的输入,但我们不能缓冲无限量。我已经将通道大小设置为1e6- 但一百万远小于无限。对于我的用例,我根本不需要做任何缓冲。

main知道输入何时完成并可以关闭jobschanmain还知道作业何时完成 ( wg.Wait()) 并可以关闭结果通道。worker关闭这些通道是对 goroutine和goroutine的一个重要信号results——它们可以区分一个空的通道和一个保证不会有任何新添加的通道。

for job := range jobchan {...}是您更详细的简写:

for {

  job, ok :=  <- jobchan

  if !ok {

    wg.Done()

    return

  }

  ...

}

请注意,此代码创建了 2 个工人,但它可以创建 20 个或 2000 个,甚至 1 个。无论池中有多少工人,程序都会运行。它可以处理任何数量的输入(尽管无休止的输入当然会导致无休止的程序)。它不会创建输出到输入的循环循环。如果您的用例需要作业来创建更多作业,那么这是一个更具挑战性的场景,通常可以通过仔细规划来避免。

我希望这能给你一些关于如何在 Go 应用程序中更好地使用并发的好主意。

https://play.golang.wiki/p/cZuI9YXypxI


查看完整回答
反对 回复 2022-10-17
  • 1 回答
  • 0 关注
  • 111 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信