1 回答
TA贡献1880条经验 获得超4个赞
这一切都很好,但我遇到的问题是堆栈本身的长度可能为零,所以在我尝试将堆栈 [0] 发送到 jobchan 的情况下,如果堆栈为空,则会出现越界错误。
我无法使用您的游乐场链接重现它,但它是可信的,因为至少有一名gofunc
工作人员可能已经准备好在该频道上接收。
我的输出是Msgcnt: 0
,这也很容易解释,因为在运行它时gofunc
可能还没有准备好接收。这些操作的顺序没有定义。jobschan
dispatch()
select
尝试创建一个调度 go 例程,将作业发送到在 jobchan 频道上监听的工作池
通道不需要调度程序。通道是调度程序。
如果一条消息通过 dispatchchan 通道进入我的调度函数并且我的其他 go 例程正忙,则消息是 [...] 将 [...] 稍后当工作人员可用时再次发送,[...] 或否在 dispatchchan 上收到更多消息。
通过一些创造性的编辑,很容易将其变成接近缓冲通道定义的东西。它可以立即读取,也可以占用一些limit
无法立即发送的消息。你确实定义limit
了,尽管它没有在你的代码中的其他地方使用。
在任何函数中,定义一个您不读取的变量都会导致编译时错误,例如limit declared but not used
. 这种限制提高了代码质量并有助于识别类型。但是在包范围内,您已经摆脱了将未使用的定义limit
为“全局”,从而避免了一个有用的错误——您没有限制任何东西。
不要使用全局变量。使用传递的参数来定义作用域,因为作用域的定义相当于用go
关键字表达的功能并发。 将本地范围内定义的相关通道传递给包范围内定义的函数,以便您轻松跟踪它们的关系。并使用定向渠道来强制执行您的功能之间的生产者/消费者关系。稍后再谈。
回到“限制”,限制您排队的作业数量是有意义的,因为所有资源都是有限的,并且接受比您对处理的任何预期更多的消息需要比进程内存提供的更持久的存储。如果你觉得无论如何都没有义务满足这些要求,那么一开始就不要接受“太多”的要求。
那么,什么功能有dispatchchan
和dispatch()
?在处理之前存储有限数量的待处理请求(如果有的话),然后将它们发送给下一个可用的工作人员?这正是缓冲通道的用途。
循环逻辑
谁“知道”你的程序何时完成? main()
提供初始输入,但在 `dispatch() 中关闭所有 3 个通道:
close(jobchan) close(dispatchchan) close(mw)
您的工作人员写入自己的作业队列,因此只有在工作人员完成写入后才能关闭传入的作业队列。但是,个别工作人员也不知道何时关闭作业队列,因为其他工作人员正在写入它。 没有人知道你的算法何时完成。这就是你的循环逻辑。
mw 通道是一个缓冲通道,长度与 worker go 例程的数量相同。它充当工作池的信号量。
这里有一个竞争条件。考虑所有n
工人刚刚收到最后n
一份工作的情况。他们每个人都读取jobschan
并检查ok
. disptatcher
继续运行它的select
. 没有人现在正在写信dispatchchan
或阅读,jobschan
因此default
案件会立即匹配。 len(stack)
是0
并且没有电流job
,因此dispatcher
关闭所有频道,包括mw
. 此后的某个时候,一名工作人员试图写入一个关闭的通道并出现恐慌。
所以最后我准备提供一些代码,但我还有一个问题:我没有一个明确的问题陈述来编写代码。
我刚刚开始使用 Go 并发并尝试创建一个调度 go 例程,该例程会将作业发送到在 jobchan 通道上侦听的工作池。
goroutine 之间的通道就像同步齿轮的齿。但是齿轮会转动到什么地方呢?你不是想保持时间,也不是制造发条玩具。你的齿轮可以转动,但成功会是什么样子?他们的转身?
让我们尝试为通道定义一个更具体的用例:给定一组任意长的持续时间作为标准输入*上的字符串,在其中一个工作n
人员中休眠那么多秒。所以我们实际上有一个结果要返回,我们会说每个工作人员将返回运行持续时间的开始和结束时间。
为了让它可以在操场上运行,我将使用硬编码的字节缓冲区来模拟标准输入。
package main
import (
"bufio"
"bytes"
"fmt"
"os"
"strings"
"sync"
"time"
)
type SleepResult struct {
worker_id int
duration time.Duration
start time.Time
end time.Time
}
func main() {
var num_workers = 2
workchan := make(chan time.Duration)
resultschan := make(chan SleepResult)
var wg sync.WaitGroup
var resultswg sync.WaitGroup
resultswg.Add(1)
go results(&resultswg, resultschan)
for i := 0; i < num_workers; i++ {
wg.Add(1)
go worker(i, &wg, workchan, resultschan)
}
// playground doesn't have stdin
var input = bytes.NewBufferString(
strings.Join([]string{
"3ms",
"1 seconds",
"3600ms",
"300 ms",
"5s",
"0.05min"}, "\n") + "\n")
var scanner = bufio.NewScanner(input)
for scanner.Scan() {
text := scanner.Text()
if dur, err := time.ParseDuration(text); err != nil {
fmt.Fprintln(os.Stderr, "Invalid duration", text)
} else {
workchan <- dur
}
}
close(workchan) // we know when our inputs are done
wg.Wait() // and when our jobs are done
close(resultschan)
resultswg.Wait()
}
func results(wg *sync.WaitGroup, resultschan <-chan SleepResult) {
for res := range resultschan {
fmt.Printf("Worker %d: %s : %s => %s\n",
res.worker_id, res.duration,
res.start.Format(time.RFC3339Nano), res.end.Format(time.RFC3339Nano))
}
wg.Done()
}
func worker(id int, wg *sync.WaitGroup, jobchan <-chan time.Duration, resultschan chan<- SleepResult) {
var res = SleepResult{worker_id: id}
for dur := range jobchan {
res.duration = dur
res.start = time.Now()
time.Sleep(res.duration)
res.end = time.Now()
resultschan <- res
}
wg.Done()
}
在这里,我使用了 2 个等待组,一个用于工人,一个用于结果。这确保我在main()
结束之前完成了所有结果的编写。我通过让每个函数一次只做一件事来保持我的函数简单:main 读取输入,从中解析持续时间,然后将它们发送给下一个 worker。该results
函数收集结果并将它们打印到标准输出。工人负责睡眠、读取jobchan
和写入resultschan
。
workchan
可以缓冲(或不缓冲,如本例所示);没关系,因为输入将以可以处理的速度读取。我们可以缓冲尽可能多的输入,但我们不能缓冲无限量。我已经将通道大小设置为1e6
- 但一百万远小于无限。对于我的用例,我根本不需要做任何缓冲。
main
知道输入何时完成并可以关闭jobschan
. main
还知道作业何时完成 ( wg.Wait()
) 并可以关闭结果通道。worker
关闭这些通道是对 goroutine和goroutine的一个重要信号results
——它们可以区分一个空的通道和一个保证不会有任何新添加的通道。
for job := range jobchan {...}
是您更详细的简写:
for {
job, ok := <- jobchan
if !ok {
wg.Done()
return
}
...
}
请注意,此代码创建了 2 个工人,但它可以创建 20 个或 2000 个,甚至 1 个。无论池中有多少工人,程序都会运行。它可以处理任何数量的输入(尽管无休止的输入当然会导致无休止的程序)。它不会创建输出到输入的循环循环。如果您的用例需要作业来创建更多作业,那么这是一个更具挑战性的场景,通常可以通过仔细规划来避免。
我希望这能给你一些关于如何在 Go 应用程序中更好地使用并发的好主意。
https://play.golang.wiki/p/cZuI9YXypxI
- 1 回答
- 0 关注
- 111 浏览
添加回答
举报