为了账号安全,请及时绑定邮箱和手机立即绑定

使用 multiprocessing.Pool 时如何理解 multiprocessing.

使用 multiprocessing.Pool 时如何理解 multiprocessing.

Smart猫小萌 2021-09-25 13:23:03
为什么我不能把process在Pool成Queue?这里我的代码在使用时有效Pool并且可以获得Test实例属性。from multiprocessing import Poolfrom multiprocessing import Queueclass Test(object):    def __init__(self, num):        self.num = numif __name__ == '__main__':    p = Pool()    procs = []    for i in range(5):        proc = p.apply_async(Test, args=(i,))        procs.append(proc)    p.close()    for each in procs:        test = each.get(10)        print(test.num)    p.join()当我尝试使用Queuenot pythonlist来存储进程时,这是行不通的。我的代码:from multiprocessing import Poolfrom multiprocessing import Queueclass Test(object):    def __init__(self, num):        self.num = numif __name__ == '__main__':    p = Pool()    q = Queue()    for i in range(5):        proc = p.apply_async(Test, args=(i,))        q.put(proc)    p.close()    while not q.empty():        q.get()    p.join()错误消息:Traceback (most recent call last):  File "C:\Users\laich\AppData\Local\Programs\Python\Python36- 32\lib\multiprocessing\queues.py", line 234, in _feed    obj = _ForkingPickler.dumps(obj)  File "C:\Users\laich\AppData\Local\Programs\Python\Python36- 32\lib\multiprocessing\reduction.py", line 51, in dumps    cls(buf, protocol).dump(obj)TypeError: can't pickle _thread.lock objects我去看多处理文档:class multiprocessing.Queue([maxsize]) 返回使用管道和一些锁/信号量实现的进程共享队列。当一个进程第一次将一个项目放入队列时,一个馈线线程将启动,它将对象从缓冲区传输到管道中。标准库队列模块中的通常queue.Empty和queue.Full异常被引发以发出超时信号。Queue 实现了queue.Queue除了task_done()and之外的所有方法join()。这里说“放置一个项目”,这个项目不能是任何东西(python对象)?在我来说,我试图把process在Pool()成Queue。
查看完整描述

2 回答

?
牛魔王的故事

TA贡献1830条经验 获得超3个赞

您的Queue基于代码的代码至少存在两个问题。Pool.apply_async方法返回一个AsyncResult对象,而不是一个进程。您可以调用get该对象来获取相应过程的结果。考虑到这种差异,让我们看看您的代码:


proc = p.apply_async(Test, args=(i,)) # Returns an AsyncResult object

q.put(proc) # won't work

在您的情况下,第二行将始终失败。您放入队列的任何内容都必须是可腌制的,因为multiprocess.Queue使用序列化。这没有很好的文档记录,并且Python 的问题跟踪器中有一个未解决的问题来更新文档。问题是AsyncResult不能腌制。你可以自己试试:


import pickle

import multiprocessing as mp


with mp.Pool() as p:

    result = p.apply_async(lambda x: x, (1,))


pickle.dumps(result) # Error

AsyncResult内部包含一些锁定对象,它们不可序列化。让我们转到下一个问题:


while not q.empty():

    q.get()

如果我没记错的话,在上面的代码中,您要调用AsyncResult.get而不是Queue.get. 在这种情况下,您必须首先从队列中获取您的对象,然后在您的对象上调用相应的方法。但是,在您的代码中情况并非如此,因为AsyncResult它不可序列化。


查看完整回答
反对 回复 2021-09-25
?
HUX布斯

TA贡献1876条经验 获得超6个赞

AsyncResult对象不能被腌制,这multiprocessing.Queue是必需的。但是,这里不需要一个,因为队列没有在进程之间共享。这意味着您可以只使用常规的Queue.


from multiprocessing import Pool

#from multiprocessing import Queue

from queue import Queue



class Test(object):

    def __init__(self, num):

        self.num = num

        print('Test({!r}) created'.format(num))



if __name__ == '__main__':

    p = Pool()

    q = Queue()

    for i in range(5):

        proc = p.apply_async(Test, args=(i,))

        q.put(proc)

    p.close()

    while not q.empty():

        q.get()

    p.join()


    print('done')

输出:


Test(0)

Test(1)

Test(2)

Test(3)

Test(4)

done


查看完整回答
反对 回复 2021-09-25
  • 2 回答
  • 0 关注
  • 311 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号