为了账号安全,请及时绑定邮箱和手机立即绑定

从单个脚本运行多个 python 脚本,并在它们之间来回通信?

从单个脚本运行多个 python 脚本,并在它们之间来回通信?

慕后森 2021-08-24 19:14:39
我有一个我编写的脚本,我可以将参数传递给它,并且我想使用唯一参数启动多个同时迭代(可能是 100+)。我的计划是编写另一个 python 脚本,然后启动这些下标/进程,但是为了有效,我需要该脚本能够监视下标是否有任何错误。有没有什么直接的方法可以做到这一点,或者有一个提供这个功能的库?我一直在寻找一段时间,但没有找到任何好运。创建子进程和多线程似乎很简单,但我真的找不到任何关于如何与这些线程/子进程通信的指南或教程。
查看完整描述

3 回答

?
PIPIONE

TA贡献1829条经验 获得超9个赞

就像其他人说的,你必须使用多个进程来实现真正的并行而不是线程,因为 GIL 限制阻止线程并发运行。


如果你想使用标准的多处理库(它基于启动多个进程),我建议使用一个worker 池。如果我理解正确,您想启动 100 多个并行实例。在一台主机上启动 100 多个进程会产生过多的开销。相反,创建一个 P 工人池,其中 P 是例如您机器中的核心数,并将 100 多个作业提交到池中。这很简单,网上有很多例子。此外,当您向池提交作业时,您可以提供回调函数来接收错误。这可能足以满足您的需求(有例子在这里)。


但是,上次我查看时,多处理中的池无法在多个主机(例如机器集群)之间分配工作。所以,如果你需要这样做,或者如果你需要一个更灵活的通信方案,比如能够在工作人员运行时向控制进程发送更新,我的建议是使用char4py(请注意,我是一个 char4py 开发人员,所以这是我有经验的地方)。


使用charm4py,您可以创建N 个工作进程,这些工作进程由运行时分布在P 个进程中(跨多个主机工作),并且工作进程只需通过远程方法调用即可与控制器通信。这是一个小例子:


from charm4py import charm, Chare, Group, Array, ArrayMap, Reducer, threaded

import time


WORKER_ITERATIONS = 100



class Worker(Chare):


    def __init__(self, controller):

        self.controller = controller


    @threaded

    def work(self, x, done_future):

        result = -1

        try:

            for i in range(WORKER_ITERATIONS):

                if i % 20 == 0:

                    # send status update to controller

                    self.controller.progressUpdate(self.thisIndex, i, ret=True).get()

                if i == 5 and self.thisIndex[0] % 2 == 0:

                    # trigger NameError on even-numbered workers

                    test[3] = 3

                time.sleep(0.01)

            result = x**2

        except Exception as e:

            # send error to controller

            self.controller.collectError(self.thisIndex, e)

        # send result to controller

        self.contribute(result, Reducer.gather, done_future)



# This custom map is used to prevent workers from being created on process 0

# (where the controller is). Not strictly needed, but allows more timely

# controller output

class WorkerMap(ArrayMap):

    def procNum(self, index):

        return (index[0] % (charm.numPes() - 1)) + 1



class Controller(Chare):


    def __init__(self, args):

        self.startTime = time.time()

        done_future = charm.createFuture()

        # create 12 workers, which are distributed by charm4py among processes

        workers = Array(Worker, 12, args=[self.thisProxy], map=Group(WorkerMap))

        # start work

        for i in range(12):

            workers[i].work(i, done_future)

        print('Results are', done_future.get())  # wait for result

        exit()


    def progressUpdate(self, worker_id, current_step):

        print(round(time.time() - self.startTime, 3), ': Worker', worker_id,

              'progress', current_step * 100 / WORKER_ITERATIONS, '%')

        # the controller can return a value here and the worker would receive it


    def collectError(self, worker_id, error):

        print(round(time.time() - self.startTime, 3), ': Got error', error,

              'from worker', worker_id)



charm.start(Controller)

在此示例中,控制器将在发生状态更新和错误时打印它们。完成所有工作后,它将打印所有工人的最终结果。失败的工作人员的结果将为 -1。


进程数 P 在启动时给出。运行时将在可用进程之间分配 N 个工作程序。这发生在创建工作程序并且在此特定示例中没有动态负载平衡时。


另外,请注意,在charm4py 模型中,远程方法调用是异步的,并返回调用者可以阻塞的未来,但只有调用线程会阻塞(而不是整个过程)。


希望这会有所帮助。


查看完整回答
反对 回复 2021-08-24
?
鸿蒙传说

TA贡献1865条经验 获得超7个赞

更好的方法是使用线程。如果您将要调用的脚本放入这个更大的脚本中的函数中,您可以让主函数根据需要多次调用此脚本,并让线程根据需要报告信息。您可以在此处阅读一些有关线程如何工作的信息。


查看完整回答
反对 回复 2021-08-24
  • 3 回答
  • 0 关注
  • 286 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信