为了账号安全,请及时绑定邮箱和手机立即绑定

Python递归多处理 - 线程太多

Python递归多处理 - 线程太多

繁星coding 2021-06-30 14:04:34
背景:Python 3.5.1,Windows 7我有一个保存大量文件和目录的网络驱动器。我正在尝试编写一个脚本来尽快解析所有这些,以找到与 RegEx 匹配的所有文件,并将这些文件复制到我的本地 PC 以供查看。大约有 3500 个目录和子目录,以及几百万个文件。我试图使其尽可能通用(即,不将代码写入此确切的文件结构),以便将其重用于其他网络驱动器。我的代码在针对小型网络驱动器运行时有效,这里的问题似乎是可扩展性。我已经使用多处理库尝试了一些东西,但似乎无法让它可靠地工作。我的想法是创建一个新作业来解析每个子目录以尽快工作。我有一个递归函数,它解析目录中的所有对象,然后为任何子目录调用自身,并根据 RegEx 检查它找到的任何文件。问题:如何在不使用 Pools 的情况下限制线程/进程的数量来实现我的目标?我试过的:如果我只使用 Process 作业,我会在RuntimeError: can't start new thread超过几百个线程启动后收到错误消息,并且它开始断开连接。我最终找到了大约一半的文件,因为一半的目录出错了(下面的代码)。为了限制总线程数,我尝试使用 Pool 方法,但是我不能根据这个问题将池对象传递给被调用的方法,这使得递归实现无法实现。为了解决这个问题,我尝试在 Pool 方法中调用 Processes ,但出现错误daemonic processes are not allowed to have children。我认为如果我可以限制并发线程的数量,那么我的解决方案将按设计工作。
查看完整描述

2 回答

?
皈依舞

TA贡献1851条经验 获得超3个赞

我无法完全按照我的意愿工作。os.walk 很慢,我想到的所有其他方法要么速度相似,要么由于线程太多而崩溃。

我最终使用了我在上面发布的类似方法,但不是在顶级目录开始递归,而是向下一层或两层,直到有几个目录。然后它将在这些目录中的每一个串行开始递归,这限制了足以成功完成的线程数。执行时间与 os.walk 类似,这可能会使实现更简单、更易读。


查看完整回答
反对 回复 2021-07-13
?
牧羊人nacy

TA贡献1862条经验 获得超7个赞

不要鄙视池的用处,尤其是当您想控制要创建的进程数量时。他们还负责管理您的工作人员(创建/启动/加入/分配工作块)并帮助您收集潜在结果。


正如您自己意识到的那样,您创建了太多进程,以至于您似乎耗尽了如此多的系统资源而无法创建更多进程。


此外,在代码中创建新进程受外部因素控制,即文件树中的文件夹数,这使得限制进程数变得非常困难。此外,创建新进程会在操作系统上带来相当大的开销,您甚至可能最终将这些开销浪费在空目录上。另外,进程之间的上下文切换非常昂贵。


根据您创建的进程数量,考虑到您声明的文件夹数量,您的进程基本上只会坐在那里闲置,而他们正在等待 CPU 时间份额来实际执行某些工作。除非您拥有一台拥有数千个内核的超级计算机,否则对于上述 CPU 时间会有很多争用。即使一个进程获得了一些 CPU 时间来工作,它也可能会花费相当多的时间来等待 I/O。


话虽如此,您可能想要研究使用线程来完成这样的任务。你可以在你的代码中做一些优化。从您的示例中,我看不出有任何理由将识别要复制的文件和实际将它们复制到不同的任务中。为什么不让您的员工立即复制他们发现与 RE 匹配的每个文件?


我会使用os.walk(我认为相当快)从主线程中创建有问题的目录中的文件列表,然后将该列表卸载到工作人员池,检查这些文件是否匹配并立即复制这些文件:


import os

import re

from multiprocessing.pool import ThreadPool


search_dirs = ["dir 1", "dir2"]

ptn = re.compile(r"your regex")

# your target dir definition


file_list = []


for topdir in search_dirs:

    for root, dirs, files in os.walk(topdir):

        for file in files:

            file_list.append(os.path.join(root, file))


def copier(path):

    if ptn.match(path):

        # do your shutil.copyfile with the try-except right here

        # obviously I did not want to start mindlessly copying around files on my box :)

        return path


with ThreadPool(processes=10) as pool:

    results = pool.map(copier, file_list)


# print all the processed files. For those that did not match, None is returned

print("\n".join([r for r in results if r]))

附带说明:不要手动连接您的路径 ( file[0] + "\\" + file[1]),而是os.path.join用于此。


查看完整回答
反对 回复 2021-07-13
  • 2 回答
  • 0 关注
  • 229 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信