2 回答
TA贡献1865条经验 获得超7个赞
有一个最小的可重现示例这样的东西,然后除此之外还删除了太多代码,最终得到的东西 (1) 可能过于简单化了,有危险而不是答案可能会错过标记,并且 (2)不可能如图所示运行(您需要将创建 Pool 和提交任务的代码包含在由语句控制的块中if __name__ == '__main__':。
但是根据您所展示的内容,我不认为 Pool 是适合您的解决方案;您应该根据需要创建 Process 实例。从进程中获取结果的一种方法是将它们存储在可共享的托管字典中,其键例如是创建结果的进程的进程 ID。
为了扩展您的示例,向子任务传递了两个参数,x并且y需要作为结果返回x**2 + 'y**2。子任务将产生两个孙任务实例,每个实例计算其参数的平方。然后,子任务将使用加法组合这些进程的返回值:
from multiprocessing import Process, Manager
import os
def child_task(results_dict, x, y):
# the child task spawns new tasks
p1 = Process(target=grandchild_task, args=(results_dict, x))
p1.start()
pid1 = p1.pid
p2 = Process(target=grandchild_task, args=(results_dict, y))
p2.start()
pid2 = p2.pid
p1.join()
p2.join()
pid = os.getpid()
results_dict[pid] = results_dict[pid1] + results_dict[pid2]
def grandchild_task(results_dict, n):
pid = os.getpid()
results_dict[pid] = n * n
def main():
manager = Manager()
results_dict = manager.dict()
p = Process(target=child_task, args=(results_dict, 2, 3))
p.start()
pid = p.pid
p.join()
# results will be stored with key p.pid:
print(results_dict[pid])
if __name__ == '__main__':
main()
印刷:
13
更新
例如,如果您确实遇到这样一种情况,child_task需要处理 N 个相同的调用,只是参数不同,但它必须产生一两个子进程,那么像以前一样使用 Pool,但另外传递一个要使用的托管child_task字典用于产生额外的进程(不尝试为此使用池)并检索它们的结果。
更新 2
我能弄清楚子进程本身使用池的唯一方法是使用模块ProcessPoolExecutor中的类concurrent.futures。当我试图用 做同样的事情时multiprocessing.Pool,我得到了一个错误,因为我们有守护进程试图创建自己的进程。但即使在这里,唯一的方法是池中的每个进程都有自己的进程池。您的计算机上只有有限数量的处理器/内核,因此除非在处理中混合了一些 I/O,否则您可以创建所有这些池,但进程将等待运行的机会。因此,尚不清楚将实现什么样的性能提升。还有关闭为child_task子进程创建的所有池的问题。通常一个ProcessPoolExecutor实例是使用with块,当该块终止时,将清理创建的池。但是child_task被重复调用并且显然不能使用with块,因为我们不希望不断地创建和销毁池。我来到这里有点麻烦:传递了第三个参数,True 或 False,指示是否child_task应该启动其池的关闭。此参数的默认值为 False,我们甚至懒得传递它。在检索到所有实际结果并且child_task进程现在空闲之后,我们提交 N 个具有虚拟值但shutdown设置为 True 的新任务。请注意,该ProcessPoolExecutor函数的map工作方式与类中的相同函数有很大不同Pool(阅读文档):
from concurrent.futures import ProcessPoolExecutor
import time
child_executor = None
def child_task(x, y, shutdown=False):
global child_executor
if child_executor is None:
child_executor = ProcessPoolExecutor(max_workers=1)
if shutdown:
if child_executor:
child_executor.shutdown(False)
child_executor = None
time.sleep(.2) # make sure another process in the pool gets the next task
return None
# the child task spawns new task(s)
future = child_executor.submit(grandchild_task, y)
# we can compute one of the results using the current process:
return grandchild_task(x) + future.result()
def grandchild_task(n):
return n * n
def main():
N_WORKERS = 2
with ProcessPoolExecutor(max_workers=N_WORKERS) as executor:
# first call is (1, 2), second call is (3, 4):
results = [result for result in executor.map(child_task, (1, 3), (2, 4))]
print(results)
# force a shutdown
# need N_WORKERS invocations:
[result for result in executor.map(child_task, (0,) * N_WORKERS, (0,) * N_WORKERS, (True,) * N_WORKERS)]
if __name__ == '__main__':
main()
印刷:
[5, 25]
TA贡献1805条经验 获得超10个赞
检查此解决方案:
#!/usr/bin/python
# requires Python version 3.8 or higher
from multiprocessing import Queue, Process
import time
from random import randrange
import os
import psutil
# function to be run by each child process
def square(number):
sleep = randrange(5)
time.sleep(sleep)
print(f'Result is {number * number}, computed by pid {os.getpid()}...sleeping {sleep} secs')
# create a queue where all tasks will be placed
queue = Queue()
# indicate how many number of children you want the system to create to run the tasks
number_of_child_proceses = 5
# put all tasks in the queue above
for task in range(19):
queue.put(task)
# this the main entry/start of the program when you run
def main():
number_of_task = queue.qsize()
print(f'{"_" * 60}\nBatch: {number_of_task // number_of_child_proceses + 1} \n{"_" * 60}')
# don't create more number of children than the number of tasks. Also, in the last round, wait for all child process
# to complete so as to wrap up everything
if number_of_task <= number_of_child_proceses:
processes = [Process(target=square, args=(queue.get(),)) for _ in
range(number_of_task)]
for p in processes:
p.start()
p.join()
else:
processes = [Process(target=square, args=(queue.get(),)) for _ in range(number_of_child_proceses)]
for p in processes:
p.start()
# update count of remaining task
number_of_task = queue.qsize()
# run the program in a loop until no more task remains in the queue
while number_of_task:
current_process = psutil.Process()
children = current_process.children()
# if children process have completed assigned task but there is still more remaining tasks in the queue,
# assign them more tasks
if not len(children) and number_of_task:
print(f'\nAssigned tasks completed... reasigning the remaining {number_of_task} task(s) in the queue\n')
main()
# exit the loop if no more task in the queue to work on
print('\nAll tasks completed!!')
exit()
if __name__ == "__main__":
main()
添加回答
举报