2 回答
TA贡献1796条经验 获得超7个赞
如果您使用自己喜欢的网络搜索来搜索“Go web crawler”(或“golang web crawler”),您会发现许多示例,包括: Go Tour Exercise: Web Crawler。在 Go 中也有一些关于并发的讨论涵盖了这种事情。
在 Go 中执行此操作的“标准”方法根本不需要涉及等待组。要回答您的一个问题,defer只有在函数返回时才运行排队的事情。您有一个长时间运行的函数,所以不要defer在这样的循环中使用。
“标准”方式是在他们自己的 goroutine 中启动你想要的任意数量的工人。他们都从同一个频道读取“作业”,在无事可做时阻塞。完成后,该通道将关闭并且它们都退出。
在像爬虫这样的情况下,工作人员会发现更多的“工作”要做,并希望将它们排入队列。你不希望他们写回同一个通道,因为它会有一些有限的缓冲量(或没有!),你最终会阻止所有试图排队更多工作的工人!
一个简单的解决方案是使用一个单独的通道(例如每个工作人员都有in <-chan Job, out chan<- Job)和一个单独的队列/过滤器 goroutine 来读取这些请求,将它们附加到一个切片上,它可以任意增长或做一些全局限制,并且从切片的头部馈送另一个通道(即从一个通道读取并写入另一个通道的简单 for-select 循环)。此代码通常还负责跟踪已完成的操作(例如,访问过的 URL 的映射)并丢弃传入的重复请求。
队列 goroutine 可能看起来像这样(这里的参数名称过于冗长):
type Job string
func queue(toWorkers chan<- Job, fromWorkers <-chan Job) {
var list []Job
done := make(map[Job]bool)
for {
var send chan<- Job
var item Job
if len(list) > 0 {
send = toWorkers
item = list[0]
}
select {
case send <- item:
// We sent an item, remove it
list = list[1:]
case thing := <-fromWorkers:
// Got a new thing
if !done[thing] {
list = append(list, thing)
done[thing] = true
}
}
}
}
在这个简单的例子中,一些事情被掩盖了。比如终止。如果“作业”是一些较大的结构,你想使用chan *Job和[]*Job替代。在这种情况下,您还需要将映射类型更改为您从作业中提取的某些键(例如, Job.URL可能),并且您list[0] = nil之前list = list[1:]想要删除对*Job指针的引用并让垃圾收集器更早地处理它.
编辑:关于干净地终止的一些说明。
有几种方法可以干净地终止上述代码。可以使用等待组,但是需要小心地放置 Add/Done 调用,并且您可能需要另一个 goroutine 来执行 Wait(然后关闭一个通道以开始关闭)。工作人员不应该关闭他们的输出通道,因为有多个工作人员并且您不能多次关闭通道;队列 goroutine 在不知道工作人员何时完成的情况下无法告诉何时关闭它对工作人员的通道。
过去,当我使用与上述非常相似的代码时,我在“队列”goroutine 中使用了本地“未完成”计数器(这避免了对互斥锁或等待组具有的任何同步开销的任何需要)。将作业发送给工作人员时,未完成作业的计数会增加。当工人说它已经完成时,它又减少了。我的代码碰巧有另一个通道(除了要排队的更多节点之外,我的“队列”还收集结果)。它在自己的频道上可能更干净,但可以使用现有频道上的特殊值(例如 nil 作业指针)。无论如何,有了这样一个计数器,本地列表上现有的长度检查只需要看到当列表为空并且是时候终止时没有任何未完成的事情;
例如:
if len(list) > 0 {
send = toWorkers
item = list[0]
} else if outstandingJobs == 0 {
close(toWorkers)
return
}
- 2 回答
- 0 关注
- 193 浏览
添加回答
举报