我必须使用“spawn”来启动进程,因为我需要在进程之间传输 cuda 张量。但是使用“spawn”创建redis进程总是面临TypeError:无法pickle _thread.lock对象由于某种原因,这段代码删除了某些部分看来只有使用“fork”才能正常工作import redisfrom torch.multiprocessing import Processclass Buffer(Process): def __init__(self, name=0, num_peers=2, actor_queue=0, communicate_queue=0): Process.__init__(self) #some arguments self.actor_queue = actor_queue self.communicate_queue = communicate_queue pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True) self.r = redis.Redis(connection_pool=pool) self.r.flushall() async def write(self, r): #do sth async def aggregate(self, r): #do sth def run(self): name_process = mp.current_process().name + str(mp.current_process().pid) print('starting...', name_process) loop = asyncio.get_event_loop() asyncio.set_event_loop(loop) tasks = asyncio.gather( loop.create_task(self.write(self.r)), loop.create_task(self.aggregate(self.r)), ) try: loop.run_until_complete(tasks) finally: loop.close()if __name__ == '__main__': mp.set_start_method('spawn') queue = mp.Queue(maxsize=5) queue.put('sth') name = 'yjsp' num_peers = 2 p =Buffer(name, num_peers, queue, c_queue) p.start()
1 回答
墨色风雨
TA贡献1853条经验 获得超6个赞
问题解决了!
我们应该在 run() 中定义池和其他东西
原因如下:线程存在于进程内部,并且进程旋转子进程以启用并行。线程需要锁来防止资源问题,例如多个进程获取相同的资源并导致死锁。
如果我们在 run() 中定义池,那么当我们进入 run() 方法时,我们就已经处于子进程中。
像这样
def run(self): pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True) r = redis.Redis(connection_pool=pool) r.flushall()
添加回答
举报
0/150
提交
取消