为了账号安全,请及时绑定邮箱和手机立即绑定

为什么在 python 多处理中关闭池之前不能使用 join()

为什么在 python 多处理中关闭池之前不能使用 join()

天涯尽头无女友 2023-09-12 17:13:35
我有一个类,它有一个方法可以进行一些并行计算,并且经常被调用。因此,我希望我的池在类的构造函数中初始化一次,而不是每次调用此方法时都创建一个新池。在此方法中,我想使用 apply_async() 为所有工作进程启动一个任务,然后等待(阻塞)并聚合每个任务的结果。我的代码如下所示:class Foo:     def __init__(self, ...):         # ...         self.pool = mp.Pool(mp.cpu_count())     def do_parallel_calculations(self, ...):         for _ in range(mp.cpu_count()):              self.pool.apply_async(calc_func, args=(...), callback=aggregate_result)                  # wait for results to be aggregated to a global var by the callback         self.pool.join()  # <-- ValueError: Pool is still running                  # do something with the aggregated result of all worker processes但是,当我运行此命令时,我在 self.pool.join() 中收到错误消息:“ValueError:池仍在运行”。现在,在所有示例中,我都看到 self.pool.close() 在 self.pool.join() 之前被调用,我认为这就是我收到此错误的原因,但我不想关闭我的池,因为我想要它在那里下次调用此方法时!我不能不使用 self.pool.join(),因为我需要一种方法来等待所有进程完成,并且我不想浪费地手动旋转,例如使用“while not global_flag: pass”。我可以做什么来实现我想要做的事情?为什么多重处理不允许我加入仍然开放的池?这似乎是一件完全合理的事情。
查看完整描述

2 回答

?
一只斗牛犬

TA贡献1784条经验 获得超2个赞

让我们用一个真实的例子来具体说明这一点:


import multiprocessing as mp



def calc_func(x):

    return x * x



class Foo:

    def __init__(self):

        self.pool = mp.Pool(mp.cpu_count())


    def do_parallel_calculations(self, values):

        results = []

        for value in values:

            results.append(self.pool.apply_async(calc_func, args=(value,)))

        for result in results:

            print(result.get())


if __name__ == '__main__':

    foo = Foo()

    foo.do_parallel_calculations([1,2,3])


查看完整回答
反对 回复 2023-09-12
?
千巷猫影

TA贡献1829条经验 获得超7个赞

我想我设法通过在 apply_async() 返回的 AsyncResult 对象上调用 get() 来做到这一点。所以代码就变成了:


def do_parallel_calculations(self, ...):

     results = []

     for _ in range(mp.cpu_count()):

          results.append(self.pool.apply_async(calc_func, args=(...)))

     aggregated_result = 0

     for result in results:

          aggregated_result += result.get()

其中 calc_func() 返回单个任务结果,不需要回调和全局变量。


这并不理想,因为我以任意顺序等待它们,而不是按照它们实际完成的顺序(最有效的方法是减少结果),但由于我只有 4 个核心,所以几乎不会被注意到。


查看完整回答
反对 回复 2023-09-12
  • 2 回答
  • 0 关注
  • 91 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信