为了账号安全,请及时绑定邮箱和手机立即绑定

Go 中的网络爬虫

Go 中的网络爬虫

Go
梦里花落0921 2021-09-27 17:34:22
我正在尝试在 Go 中构建一个网络爬虫,我想在其中指定并发工作人员的最大数量。只要队列中有可供探索的链接,它们都会工作。当队列的元素少于工作人员时,工作人员应该大声喊叫,但如果找到更多链接,则继续。我试过的代码是const max_workers = 6// simulating links with intfunc crawl(wg *sync.WaitGroup, queue chan int) {    for element := range queue {           wg.Done() // why is defer here causing a deadlock?        fmt.Println("adding 2 new elements ")        if element%2 == 0 {            wg.Add(2)            queue <- (element*100 + 11)            queue <- (element*100 + 33)        }    }}func main() {    var wg sync.WaitGroup    queue := make(chan int, 10)    queue <- 0    queue <- 1    queue <- 2    queue <- 3    var min int    if (len(queue) < max_workers) {        min = len(queue)    } else {        min = max_workers    }    for i := 0; i < min; i++ {        wg.Add(1)        go crawl(&wg, queue)    }    wg.Wait()    close(queue)}链接到游乐场这似乎有效,但有一个问题:当我开始时,我必须用多个元素填充队列。我希望它从(单个)种子页面(在我的示例中queue <- 0)开始,然后动态地扩大/缩小工作池。我的问题是:我怎样才能获得行为?为什么 defer 会wg.Done()导致死锁?wg.Done()实际完成后功能正常吗?我认为没有defergoroutine 不会等待另一部分完成(在解析 HTML 的实际工作示例中可能需要更长的时间)。
查看完整描述

2 回答

?
芜湖不芜

TA贡献1796条经验 获得超7个赞

如果您使用自己喜欢的网络搜索来搜索“Go web crawler”(或“golang web crawler”),您会发现许多示例,包括: Go Tour Exercise: Web Crawler。在 Go 中也有一些关于并发的讨论涵盖了这种事情。


在 Go 中执行此操作的“标准”方法根本不需要涉及等待组。要回答您的一个问题,defer只有在函数返回时才运行排队的事情。您有一个长时间运行的函数,所以不要defer在这样的循环中使用。


“标准”方式是在他们自己的 goroutine 中启动你想要的任意数量的工人。他们都从同一个频道读取“作业”,在无事可做时阻塞。完成后,该通道将关闭并且它们都退出。


在像爬虫这样的情况下,工作人员会发现更多的“工作”要做,并希望将它们排入队列。你不希望他们写回同一个通道,因为它会有一些有限的缓冲量(或没有!),你最终会阻止所有试图排队更多工作的工人!


一个简单的解决方案是使用一个单独的通道(例如每个工作人员都有in <-chan Job, out chan<- Job)和一个单独的队列/过滤器 goroutine 来读取这些请求,将它们附加到一个切片上,它可以任意增长或做一些全局限制,并且从切片的头部馈送另一个通道(即从一个通道读取并写入另一个通道的简单 for-select 循环)。此代码通常还负责跟踪已完成的操作(例如,访问过的 URL 的映射)并丢弃传入的重复请求。


队列 goroutine 可能看起来像这样(这里的参数名称过于冗长):


type Job string


func queue(toWorkers chan<- Job, fromWorkers <-chan Job) {

    var list []Job

    done := make(map[Job]bool)

    for {

        var send chan<- Job

        var item Job

        if len(list) > 0 {

            send = toWorkers

            item = list[0]

        }

        select {

        case send <- item:

            // We sent an item, remove it

            list = list[1:]

        case thing := <-fromWorkers:

            // Got a new thing

            if !done[thing] {

                list = append(list, thing)

                done[thing] = true

            }

        }

    }

}

在这个简单的例子中,一些事情被掩盖了。比如终止。如果“作业”是一些较大的结构,你想使用chan *Job和[]*Job替代。在这种情况下,您还需要将映射类型更改为您从作业中提取的某些键(例如, Job.URL可能),并且您list[0] = nil之前list = list[1:]想要删除对*Job指针的引用并让垃圾收集器更早地处理它.


编辑:关于干净地终止的一些说明。


有几种方法可以干净地终止上述代码。可以使用等待组,但是需要小心地放置 Add/Done 调用,并且您可能需要另一个 goroutine 来执行 Wait(然后关闭一个通道以开始关闭)。工作人员不应该关闭他们的输出通道,因为有多个工作人员并且您不能多次关闭通道;队列 goroutine 在不知道工作人员何时完成的情况下无法告诉何时关闭它对工作人员的通道。


过去,当我使用与上述非常相似的代码时,我在“队列”goroutine 中使用了本地“未完成”计数器(这避免了对互斥锁或等待组具有的任何同步开销的任何需要)。将作业发送给工作人员时,未完成作业的计数会增加。当工人说它已经完成时,它又减少了。我的代码碰巧有另一个通道(除了要排队的更多节点之外,我的“队列”还收集结果)。它在自己的频道上可能更干净,但可以使用现有频道上的特殊值(例如 nil 作业指针)。无论如何,有了这样一个计数器,本地列表上现有的长度检查只需要看到当列表为空并且是时候终止时没有任何未完成的事情;


例如:


    if len(list) > 0 {

        send = toWorkers

        item = list[0]

    } else if outstandingJobs == 0 {

        close(toWorkers)

        return

    }


查看完整回答
反对 回复 2021-09-27
  • 2 回答
  • 0 关注
  • 193 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信