为了账号安全,请及时绑定邮箱和手机立即绑定

如何从并行进程中运行的函数中检索值?

如何从并行进程中运行的函数中检索值?

POPMUISE 2021-08-17 10:33:07
Multiprocessing 模块对于 Python 初学者来说非常令人困惑,特别是对于那些刚从 MATLAB 迁移并对其并行计算工具箱变得懒惰的人。我有以下函数需要大约 80 秒才能运行,我想通过使用 Python 的多处理模块来缩短这个时间。from time import timexmax   = 100000000start = time()for x in range(xmax):    y = ((x+5)**2+x-40)    if y <= 0xf+1:        print('Condition met at: ', y, x)end  = time()tt   = end-start #total timeprint('Each iteration took: ', tt/xmax)print('Total time:          ', tt)这按预期输出:Condition met at:  -15 0Condition met at:  -3 1Condition met at:  11 2Each iteration took:  8.667453265190124e-07Total time:           86.67453265190125由于循环的任何迭代都不依赖于其他循环,因此我尝试采用官方文档中的此服务器进程来扫描单独进程中的范围块。最后我想出了 vartec 对这个问题的回答,可以准备以下代码。我还根据 Darkonaut 对当前问题的回答更新了代码。from time import time import multiprocessing as mpdef chunker (rng, t): # this functions makes t chunks out of rng    L  = rng[1] - rng[0]    Lr = L % t    Lm = L // t    h  = rng[0]-1    chunks = []    for i in range(0, t):        c  = [h+1, h + Lm]        h += Lm        chunks.append(c)    chunks[t-1][1] += Lr + 1    return chunksdef worker(lock, xrange, return_dict):    '''worker function'''    for x in range(xrange[0], xrange[1]):        y = ((x+5)**2+x-40)        if y <= 0xf+1:            print('Condition met at: ', y, x)            return_dict['x'].append(x)            return_dict['y'].append(y)            with lock:                                list_x = return_dict['x']                list_y = return_dict['y']                list_x.append(x)                list_y.append(y)                return_dict['x'] = list_x                return_dict['y'] = list_y这将运行时间大大减少到约 17 秒。但是,我的共享变量无法检索任何值。请帮我找出代码的哪一部分出错了。
查看完整描述

1 回答

?
达令说

TA贡献1821条经验 获得超6个赞

您示例中的问题是Manager.dict不会传播对其中的标准可变结构的修改。我首先向您展示如何与经理一起修复它,只是为了向您展示更好的选择。

multiprocessing.Manager有点重,因为它使用单独的进程来Manager处理共享对象需要使用锁来保持数据一致性。如果您在一台机器上运行它,有更好的选择multiprocessing.Pool,以防您不必运行自定义Process类,如果必须,multiprocessing.Processmultiprocessing.Queue将是执行此操作的常用方法。

引用部分来自多处理文档。


经理

如果标准(非代理)列表或字典对象包含在引用对象中,对这些可变值的修改将不会通过管理器传播,因为代理无法知道其中包含的值何时被修改。但是,在容器代理中存储一个值(触发代理对象上的setitem)确实会通过管理器传播,因此要有效地修改这样的项目,可以将修改后的值重新分配给容器代理......

在您的情况下,这看起来像:

def worker(xrange, return_dict, lock):

    """worker function"""

    for x in range(xrange[0], xrange[1]):

        y = ((x+5)**2+x-40)

        if y <= 0xf+1:

            print('Condition met at: ', y, x)

            with lock:

                list_x = return_dict['x']

                list_y = return_dict['y']

                list_x.append(x)

                list_y.append(y)

                return_dict['x'] = list_x

                return_dict['y'] = list_y

在lock这里将是一个manager.Lock你必须传递下去作为参数,因为整个(现在)被锁定的操作本身不是原子实例。(这 是一个Manager使用 Lock的更简单的例子)


对于大多数用例,这种方法可能不如使用嵌套代理对象方便,但也展示了对同步的一定程度的控制。


由于 Python 3.6 代理对象是可嵌套的:


在 3.6 版更改: 共享对象能够嵌套。例如,共享容器对象(如共享列表)可以包含其他共享对象,这些对象都将由 SyncManager 管理和同步。


从 Python 3.6 开始,您可以manager.dict在开始多处理之前使用manager.listas 值填充您的值,然后直接附加到工作程序中而无需重新分配。


return_dict['x'] = manager.list()

return_dict['y'] = manager.list()

编辑:


这是完整的示例Manager:


import time

import multiprocessing as mp

from multiprocessing import Manager, Process

from contextlib import contextmanager

# mp_util.py from first link in code-snippet for "Pool"

# section below

from mp_utils import calc_batch_sizes, build_batch_ranges


# def context_timer ... see code snippet in "Pool" section below


def worker(batch_range, return_dict, lock):

    """worker function"""

    for x in batch_range:

        y = ((x+5)**2+x-40)

        if y <= 0xf+1:

            print('Condition met at: ', y, x)

            with lock:

                return_dict['x'].append(x)

                return_dict['y'].append(y)



if __name__ == '__main__':


    N_WORKERS = mp.cpu_count()

    X_MAX = 100000000


    batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)

    batch_ranges = build_batch_ranges(batch_sizes)

    print(batch_ranges)


    with Manager() as manager:

        lock = manager.Lock()

        return_dict = manager.dict()

        return_dict['x'] = manager.list()

        return_dict['y'] = manager.list()


        tasks = [(batch_range, return_dict, lock)

                 for batch_range in batch_ranges]


        with context_timer():


            pool = [Process(target=worker, args=args)

                    for args in tasks]


            for p in pool:

                p.start()

            for p in pool:

                p.join()


        # Create standard container with data from manager before exiting

        # the manager.

        result = {k: list(v) for k, v in return_dict.items()}


    print(result)

水池


大多数情况下,multiprocessing.Pool只会这样做。由于您想在一个范围内分布迭代,因此您的示例还有一个额外的挑战。chunker即使每个进程都有大致相同的工作要做,您的函数也无法划分范围:


chunker((0, 21), 4)

# Out: [[0, 4], [5, 9], [10, 14], [15, 21]]  # 4, 4, 4, 6!

对于下面请抢代码的代码片段mp_utils.py从我的答案在这里,它提供了两个功能区块范围内尽量均匀。


随着multiprocessing.Pool你的worker功能,只是必须返回结果,并Pool会采取运送结果返回的照顾了内部队列回父进程。这result将是一个列表,因此您必须以您希望的方式再次重新排列结果。您的示例可能如下所示:


import time

import multiprocessing as mp

from multiprocessing import Pool

from contextlib import contextmanager

from itertools import chain


from mp_utils import calc_batch_sizes, build_batch_ranges


@contextmanager

def context_timer():

    start_time = time.perf_counter()

    yield

    end_time = time.perf_counter()

    total_time   = end_time-start_time

    print(f'\nEach iteration took: {total_time / X_MAX:.4f} s')

    print(f'Total time:          {total_time:.4f} s\n')



def worker(batch_range):

    """worker function"""

    result = []

    for x in batch_range:

        y = ((x+5)**2+x-40)

        if y <= 0xf+1:

            print('Condition met at: ', y, x)

            result.append((x, y))

    return result



if __name__ == '__main__':


    N_WORKERS = mp.cpu_count()

    X_MAX = 100000000


    batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)

    batch_ranges = build_batch_ranges(batch_sizes)

    print(batch_ranges)


    with context_timer():

        with Pool(N_WORKERS) as pool:

            results = pool.map(worker, iterable=batch_ranges)


    print(f'results: {results}')

    x, y = zip(*chain.from_iterable(results))  # filter and sort results

    print(f'results sorted: x: {x}, y: {y}')

示例输出:


[range(0, 12500000), range(12500000, 25000000), range(25000000, 37500000), 

range(37500000, 50000000), range(50000000, 62500000), range(62500000, 75000000), range(75000000, 87500000), range(87500000, 100000000)]

Condition met at:  -15 0

Condition met at:  -3 1

Condition met at:  11 2


Each iteration took: 0.0000 s

Total time:          8.2408 s


results: [[(0, -15), (1, -3), (2, 11)], [], [], [], [], [], [], []]

results sorted: x: (0, 1, 2), y: (-15, -3, 11)


Process finished with exit code 0

如果您有多个参数,您worker将使用参数元组构建一个“任务”列表,并pool.map(...)与pool.starmap(...iterable=tasks). 有关更多详细信息,请参阅文档。


进程和队列


如果multiprocessing.Pool由于某种原因无法使用,则必须自己处理进程间通信 (IPC),方法是将multiprocessing.Queueas 参数传递给子 进程中的工作函数,并让它们将结果排入队列以发送回给父母。


您还必须构建类似 Pool 的结构,以便您可以对其进行迭代以启动和加入进程,并且您必须get()从队列中返回结果。Queue.get我在这里写了更多关于用法的信息。


使用这种方法的解决方案可能如下所示:


def worker(result_queue, batch_range):

    """worker function"""

    result = []

    for x in batch_range:

        y = ((x+5)**2+x-40)

        if y <= 0xf+1:

            print('Condition met at: ', y, x)

            result.append((x, y))

    result_queue.put(result)  # <--



if __name__ == '__main__':


    N_WORKERS = mp.cpu_count()

    X_MAX = 100000000


    result_queue = mp.Queue()  # <--

    batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)

    batch_ranges = build_batch_ranges(batch_sizes)

    print(batch_ranges)


    with context_timer():


        pool = [Process(target=worker, args=(result_queue, batch_range))

                for batch_range in batch_ranges]


        for p in pool:

            p.start()


        results = [result_queue.get() for _ in batch_ranges]


        for p in pool:

            p.join()


    print(f'results: {results}')

    x, y = zip(*chain.from_iterable(results))  # filter and sort results

    print(f'results sorted: x: {x}, y: {y}')


查看完整回答
反对 回复 2021-08-17
  • 1 回答
  • 0 关注
  • 129 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信