3 回答
TA贡献1772条经验 获得超6个赞
第一个问题是
wg.Add
always 必须在它所代表的 goroutine(s) 之外。如果不是,则wg.Wait
可能会在 goutine 实际开始运行(并被调用wg.Add
)之前调用该调用,因此会“认为”没有什么可等待的。代码的第二个问题是它等待例程完成的方式有多种。有渠道
WaitGroup
,有done
渠道。仅使用其中之一。哪一个还取决于如何使用 goroutines 的结果。在这里,我们来到下一个问题。第三个问题是收集结果。目前,该代码仅打印/使用来自 goroutine 的单个结果。
for { ... }
在选择周围放置一个循环,如果通道关闭,则使用它return
来跳出循环。done
(请注意,您不需要在done
频道上发送任何内容,关闭它就足够了。)
改进版 0.0.1
所以这里的第一个版本(包括其他一些“代码清理”)带有done
用于关闭和WaitGroup
删除的通道:
func main() {
ordersFile, err := os.Open(ordersFilename)
if err != nil {
log.Fatalln("Could not open file: " + ordersFilename)
}
orders := getOrderIDs(ordersFile)
files := Files{
filenames: getCSVsFromCurrentDir(),
}
var (
mu = new(sync.Mutex)
filenamesSize = len(files.filenames)
ch = make(chan map[string][]string, filenamesSize)
done = make(chan bool)
)
for i, filename := range files.filenames {
go func(currentFilename string, ch chan<- map[string][]string, i int, orders Orders, filenamesSize *int, mu *sync.Mutex, done chan<- bool) {
checkFile(currentFilename, orders, ch)
mu.Lock()
*filenamesSize--
mu.Unlock()
// TODO: This also accesses filenamesSize, so it also needs to be protected with the mutex:
if i == *filenamesSize {
done <- true
close(done)
}
}(filename, ch, i, orders, &filenamesSize, mu, done)
}
// Note: closing a channel is not really needed, so you can omit this:
defer close(ch)
for {
select {
case str := <-ch:
fmt.Printf("%+v\n", str)
case <-done:
return
}
}
}
改进版 0.0.2
但是,在您的情况下,我们有一些优势。我们确切地知道我们启动了多少个 goroutine,因此也知道我们期望有多少结果。(当然,如果每个 goroutine 返回一个当前代码所做的结果。)这为我们提供了另一种选择,因为我们可以使用另一个具有相同迭代次数的 for 循环来收集结果:
func main() {
ordersFile, err := os.Open(ordersFilename)
if err != nil {
log.Fatalln("Could not open file: " + ordersFilename)
}
orders := getOrderIDs(ordersFile)
files := Files{
filenames: getCSVsFromCurrentDir(),
}
var (
// Note: a buffered channel helps speed things up. The size does not need to match the size of the items that will
// be passed through the channel. A fixed, small size is perfect here.
ch = make(chan map[string][]string, 5)
)
for _, filename := range files.filenames {
go func(filename string) {
// orders and channel are not variables of the loop and can be used without copying
checkFile(filename, orders, ch)
}(filename)
}
for range files.filenames {
str := <-ch
fmt.Printf("%+v\n", str)
}
}
简单很多,不是吗?希望有帮助!
TA贡献1796条经验 获得超4个赞
这段代码有很多错误。
您使用 WaitGroup 错误。Add 必须在主 goroutine 中调用,否则有可能在所有 Add 调用完成之前调用 Wait。
在初始化与 Done() 调用不匹配的 WaitGroup 之后,有一个无关的 Add(1) 调用,因此 Wait 永远不会返回(假设上面的点是固定的)。
您同时使用 WaitGroup 和 done 通道来表示完成。这充其量是多余的。
您正在读取 filenamesSize 而没有持有锁(在
if i == *filenamesSize
语句中)。这是一个竞争条件。首先,这种
i == *filenamesSize
情况毫无意义。Goroutines 以任意顺序执行,所以你不能确定 i == 0 的 goroutine 是最后一个减少 filenamesSize
这可以通过去掉大部分同步原语并在所有 goroutine 完成后简单地关闭 ch 通道来简化:
func main() {
ch := make(chan map[string][]string)
var wg WaitGroup
for _, filename := range getCSVsFromCurrentDir() {
filename := filename // capture loop var
wg.Add(1)
go func() {
checkFile(filename, orders, ch)
wg.Done()
}()
}
go func() {
wg.Wait() // after all goroutines are done...
close(ch) // let range loop below exit
}()
for str := range ch {
// ...
}
}
TA贡献2019条经验 获得超9个赞
不是答案,而是一些不适合评论框的评论。
在这部分代码中
func main() {
var (
ordersFile *os.File
files Files
orders Orders
err error
)
mu := new(sync.Mutex)
wg := &sync.WaitGroup{}
wg.Add(1)
最后一条语句是对 wg.Add 的调用,它看起来是悬空的。我的意思是我们很难理解什么会触发所需的 wg.Done 对应部分。在没有 wg.Done 的情况下调用 wg.Add 是一个错误,如果不以这样的方式编写它们很容易出错,我们无法立即找到它们。
在代码的那部分,显然是错误的
go func(currentFilename string, ch chan<- map[string][]string, i int, orders Orders, wg *sync.WaitGroup, filenamesSize *int, mu *sync.Mutex, done chan<- bool) {
wg.Add(1)
defer wg.Done()
考虑到当例程执行时,您将 1 添加到等待组,父例程继续执行。看这个例子: https: //play.golang.org/p/N9Chaqkv4bd 主程序不等待等待组,因为它没有时间递增。
还有更多要说的,但我发现很难理解你的代码的目的,所以我不确定如何在不重写它的情况下进一步帮助你。
- 3 回答
- 0 关注
- 118 浏览
添加回答
举报