为了账号安全,请及时绑定邮箱和手机立即绑定

使用 ProcessPoolExecutor 进行 Web Scraping:

使用 ProcessPoolExecutor 进行 Web Scraping:

回首忆惘然 2022-01-18 15:31:59
我编写了一个程序来抓取单个网站并抓取某些数据。我想通过使用来加快它的执行速度ProcessingPoolExecutor。但是,我无法理解如何从单线程转换为并发。具体来说,在创建作业时(通过ProcessPoolExecutor.submit()),我可以传递类/对象和参数而不是函数和参数吗?而且,如果是这样,如何将这些作业的数据返回到队列以跟踪访问过的页面和保存抓取内容的结构?我一直以此为出发点,并查看了Queue和concurrent.futures文档(坦率地说,后者让我有点不知所措)。我也用谷歌搜索/Youtubed/SO'ed 很多都无济于事。from queue import Queue, Emptyfrom concurrent.futures import ProcessPoolExecutorclass Scraper:    """    Scrapes a single url    """    def __init__(self, url):        self.url = url # url of page to scrape        self.internal_urls = None        self.content = None        self.scrape()    def scrape(self):        """        Method(s) to request a page, scrape links from that page        to other pages, and finally scrape actual content from the current page        """        # assume that code in this method would yield urls linked in current page        self.internal_urls = set(scraped_urls)        # and that code in this method would scrape a bit of actual content        self.content = {'content1': content1, 'content2': content2, 'etc': etc}class CrawlManager:    """    Manages a multiprocess crawl and scrape of a single site    """    def __init__(self, seed_url):        self.seed_url = seed_url        self.pool = ProcessPoolExecutor(max_workers=10)        self.processed_urls = set([])        self.queued_urls = Queue()        self.queued_urls.put(self.seed_url)        self.data = {}    def crawl(self):        while True:            try:                # get a url from the queue                target_url = self.queued_urls.get(timeout=60)                # check that the url hasn't already been processed                if target_url not in self.processed_urls:                    # add url to the processed list                    self.processed_urls.add(target_url)                    print(f'Processing url {target_url}')                    # passing an object to the                    # ProcessPoolExecutor... can this be done?                    job = self.pool.submit(Scraper, target_url)
查看完整描述

1 回答

?
白衣染霜花

TA贡献1796条经验 获得超10个赞

对于遇到此页面的任何人,我都能自己解决这个问题。


根据@brad-solomon 的建议,我从 切换ProcessPoolExecutor到ThreadPoolExecutor来管理该脚本的并发方面(有关更多详细信息,请参阅他的评论)。


Wrt最初的问题,关键是利用add_done_callback方法ThreadPoolExecutor结合修改Scraper.scrape和新方法CrawlManager.proc_scraper_results,如下所示:


from queue import Queue, Empty

from concurrent.futures import ThreadPoolExecutor



class Scraper:

    """

    Scrapes a single url

    """


    def __init__(self, url):

        self.url = url # url of page to scrape

        self.internal_urls = None

        self.content = None

        self.scrape()


    def scrape(self):

        """

        Method(s) to request a page, scrape links from that page

        to other pages, and finally scrape actual content from the current page

        """

        # assume that code in this method would yield urls linked in current page

        self.internal_urls = set(scraped_urls)


        # and that code in this method would scrape a bit of actual content

        self.content = {'content1': content1, 'content2': content2, 'etc': etc}


        # these three items will be passed to the callback

        # function with in a future object

        return self.internal_urls, self.url, self.content



class CrawlManager:

    """

    Manages a multiprocess crawl and scrape of a single website

    """


    def __init__(self, seed_url):

        self.seed_url = seed_url

        self.pool = ThreadPoolExecutor(max_workers=10)

        self.processed_urls = set([])

        self.queued_urls = Queue()

        self.queued_urls.put(self.seed_url)

        self.data = {}



    def proc_scraper_results(self, future):

        # get the items of interest from the future object

        internal_urls, url, content = future._result[0], future._result[1], future._result[2]


        # assign scraped data/content

        self.data[url] = content


        # also add scraped links to queue if they

        # aren't already queued or already processed

        for link_url in internal_urls:

            if link_url not in self.to_crawl.queue and link_url not in self.processed_urls:

                self.to_crawl.put(link_url)



    def crawl(self):

        while True:

            try:

                # get a url from the queue

                target_url = self.queued_urls.get(timeout=60)


                # check that the url hasn't already been processed

                if target_url not in self.processed_urls:

                    # add url to the processed list

                    self.processed_urls.add(target_url)

                    print(f'Processing url {target_url}')


                    # add a job to the ThreadPoolExecutor (note, unlike original question, we pass a method, not an object)

                    job = self.pool.submit(Scraper(target_url).scrape)


                    # to add_done_callback we send another function, this one from CrawlManager

                    # when this function is itself called, it will be pass a `future` object

                    job.add_done_callback(self.proc_scraper_results)


            except Empty:

                print("All done.")

            except Exception as e:

                print(e)



if __name__ == '__main__':

    crawler = CrawlManager('www.mywebsite.com')

    crawler.crawl()

其结果是该计划的持续时间显着减少。


查看完整回答
反对 回复 2022-01-18
  • 1 回答
  • 0 关注
  • 152 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信