3 回答
TA贡献1829条经验 获得超9个赞
就像其他人说的,你必须使用多个进程来实现真正的并行而不是线程,因为 GIL 限制阻止线程并发运行。
如果你想使用标准的多处理库(它基于启动多个进程),我建议使用一个worker 池。如果我理解正确,您想启动 100 多个并行实例。在一台主机上启动 100 多个进程会产生过多的开销。相反,创建一个 P 工人池,其中 P 是例如您机器中的核心数,并将 100 多个作业提交到池中。这很简单,网上有很多例子。此外,当您向池提交作业时,您可以提供回调函数来接收错误。这可能足以满足您的需求(有例子在这里)。
但是,上次我查看时,多处理中的池无法在多个主机(例如机器集群)之间分配工作。所以,如果你需要这样做,或者如果你需要一个更灵活的通信方案,比如能够在工作人员运行时向控制进程发送更新,我的建议是使用char4py(请注意,我是一个 char4py 开发人员,所以这是我有经验的地方)。
使用charm4py,您可以创建N 个工作进程,这些工作进程由运行时分布在P 个进程中(跨多个主机工作),并且工作进程只需通过远程方法调用即可与控制器通信。这是一个小例子:
from charm4py import charm, Chare, Group, Array, ArrayMap, Reducer, threaded
import time
WORKER_ITERATIONS = 100
class Worker(Chare):
def __init__(self, controller):
self.controller = controller
@threaded
def work(self, x, done_future):
result = -1
try:
for i in range(WORKER_ITERATIONS):
if i % 20 == 0:
# send status update to controller
self.controller.progressUpdate(self.thisIndex, i, ret=True).get()
if i == 5 and self.thisIndex[0] % 2 == 0:
# trigger NameError on even-numbered workers
test[3] = 3
time.sleep(0.01)
result = x**2
except Exception as e:
# send error to controller
self.controller.collectError(self.thisIndex, e)
# send result to controller
self.contribute(result, Reducer.gather, done_future)
# This custom map is used to prevent workers from being created on process 0
# (where the controller is). Not strictly needed, but allows more timely
# controller output
class WorkerMap(ArrayMap):
def procNum(self, index):
return (index[0] % (charm.numPes() - 1)) + 1
class Controller(Chare):
def __init__(self, args):
self.startTime = time.time()
done_future = charm.createFuture()
# create 12 workers, which are distributed by charm4py among processes
workers = Array(Worker, 12, args=[self.thisProxy], map=Group(WorkerMap))
# start work
for i in range(12):
workers[i].work(i, done_future)
print('Results are', done_future.get()) # wait for result
exit()
def progressUpdate(self, worker_id, current_step):
print(round(time.time() - self.startTime, 3), ': Worker', worker_id,
'progress', current_step * 100 / WORKER_ITERATIONS, '%')
# the controller can return a value here and the worker would receive it
def collectError(self, worker_id, error):
print(round(time.time() - self.startTime, 3), ': Got error', error,
'from worker', worker_id)
charm.start(Controller)
在此示例中,控制器将在发生状态更新和错误时打印它们。完成所有工作后,它将打印所有工人的最终结果。失败的工作人员的结果将为 -1。
进程数 P 在启动时给出。运行时将在可用进程之间分配 N 个工作程序。这发生在创建工作程序并且在此特定示例中没有动态负载平衡时。
另外,请注意,在charm4py 模型中,远程方法调用是异步的,并返回调用者可以阻塞的未来,但只有调用线程会阻塞(而不是整个过程)。
希望这会有所帮助。
TA贡献1865条经验 获得超7个赞
更好的方法是使用线程。如果您将要调用的脚本放入这个更大的脚本中的函数中,您可以让主函数根据需要多次调用此脚本,并让线程根据需要报告信息。您可以在此处阅读一些有关线程如何工作的信息。
添加回答
举报