1 回答
TA贡献1821条经验 获得超6个赞
您示例中的问题是Manager.dict
不会传播对其中的标准可变结构的修改。我首先向您展示如何与经理一起修复它,只是为了向您展示更好的选择。
multiprocessing.Manager
有点重,因为它使用单独的进程来Manager
处理共享对象需要使用锁来保持数据一致性。如果您在一台机器上运行它,有更好的选择multiprocessing.Pool
,以防您不必运行自定义Process
类,如果必须,multiprocessing.Process
与multiprocessing.Queue
将是执行此操作的常用方法。
引用部分来自多处理文档。
经理
如果标准(非代理)列表或字典对象包含在引用对象中,对这些可变值的修改将不会通过管理器传播,因为代理无法知道其中包含的值何时被修改。但是,在容器代理中存储一个值(触发代理对象上的setitem)确实会通过管理器传播,因此要有效地修改这样的项目,可以将修改后的值重新分配给容器代理......
在您的情况下,这看起来像:
def worker(xrange, return_dict, lock):
"""worker function"""
for x in range(xrange[0], xrange[1]):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
with lock:
list_x = return_dict['x']
list_y = return_dict['y']
list_x.append(x)
list_y.append(y)
return_dict['x'] = list_x
return_dict['y'] = list_y
在lock这里将是一个manager.Lock你必须传递下去作为参数,因为整个(现在)被锁定的操作本身不是原子实例。(这 是一个Manager使用 Lock的更简单的例子)
对于大多数用例,这种方法可能不如使用嵌套代理对象方便,但也展示了对同步的一定程度的控制。
由于 Python 3.6 代理对象是可嵌套的:
在 3.6 版更改: 共享对象能够嵌套。例如,共享容器对象(如共享列表)可以包含其他共享对象,这些对象都将由 SyncManager 管理和同步。
从 Python 3.6 开始,您可以manager.dict在开始多处理之前使用manager.listas 值填充您的值,然后直接附加到工作程序中而无需重新分配。
return_dict['x'] = manager.list()
return_dict['y'] = manager.list()
编辑:
这是完整的示例Manager:
import time
import multiprocessing as mp
from multiprocessing import Manager, Process
from contextlib import contextmanager
# mp_util.py from first link in code-snippet for "Pool"
# section below
from mp_utils import calc_batch_sizes, build_batch_ranges
# def context_timer ... see code snippet in "Pool" section below
def worker(batch_range, return_dict, lock):
"""worker function"""
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
with lock:
return_dict['x'].append(x)
return_dict['y'].append(y)
if __name__ == '__main__':
N_WORKERS = mp.cpu_count()
X_MAX = 100000000
batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)
with Manager() as manager:
lock = manager.Lock()
return_dict = manager.dict()
return_dict['x'] = manager.list()
return_dict['y'] = manager.list()
tasks = [(batch_range, return_dict, lock)
for batch_range in batch_ranges]
with context_timer():
pool = [Process(target=worker, args=args)
for args in tasks]
for p in pool:
p.start()
for p in pool:
p.join()
# Create standard container with data from manager before exiting
# the manager.
result = {k: list(v) for k, v in return_dict.items()}
print(result)
水池
大多数情况下,multiprocessing.Pool只会这样做。由于您想在一个范围内分布迭代,因此您的示例还有一个额外的挑战。chunker即使每个进程都有大致相同的工作要做,您的函数也无法划分范围:
chunker((0, 21), 4)
# Out: [[0, 4], [5, 9], [10, 14], [15, 21]] # 4, 4, 4, 6!
对于下面请抢代码的代码片段mp_utils.py从我的答案在这里,它提供了两个功能区块范围内尽量均匀。
随着multiprocessing.Pool你的worker功能,只是必须返回结果,并Pool会采取运送结果返回的照顾了内部队列回父进程。这result将是一个列表,因此您必须以您希望的方式再次重新排列结果。您的示例可能如下所示:
import time
import multiprocessing as mp
from multiprocessing import Pool
from contextlib import contextmanager
from itertools import chain
from mp_utils import calc_batch_sizes, build_batch_ranges
@contextmanager
def context_timer():
start_time = time.perf_counter()
yield
end_time = time.perf_counter()
total_time = end_time-start_time
print(f'\nEach iteration took: {total_time / X_MAX:.4f} s')
print(f'Total time: {total_time:.4f} s\n')
def worker(batch_range):
"""worker function"""
result = []
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
result.append((x, y))
return result
if __name__ == '__main__':
N_WORKERS = mp.cpu_count()
X_MAX = 100000000
batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)
with context_timer():
with Pool(N_WORKERS) as pool:
results = pool.map(worker, iterable=batch_ranges)
print(f'results: {results}')
x, y = zip(*chain.from_iterable(results)) # filter and sort results
print(f'results sorted: x: {x}, y: {y}')
示例输出:
[range(0, 12500000), range(12500000, 25000000), range(25000000, 37500000),
range(37500000, 50000000), range(50000000, 62500000), range(62500000, 75000000), range(75000000, 87500000), range(87500000, 100000000)]
Condition met at: -15 0
Condition met at: -3 1
Condition met at: 11 2
Each iteration took: 0.0000 s
Total time: 8.2408 s
results: [[(0, -15), (1, -3), (2, 11)], [], [], [], [], [], [], []]
results sorted: x: (0, 1, 2), y: (-15, -3, 11)
Process finished with exit code 0
如果您有多个参数,您worker将使用参数元组构建一个“任务”列表,并pool.map(...)与pool.starmap(...iterable=tasks). 有关更多详细信息,请参阅文档。
进程和队列
如果multiprocessing.Pool由于某种原因无法使用,则必须自己处理进程间通信 (IPC),方法是将multiprocessing.Queueas 参数传递给子 进程中的工作函数,并让它们将结果排入队列以发送回给父母。
您还必须构建类似 Pool 的结构,以便您可以对其进行迭代以启动和加入进程,并且您必须get()从队列中返回结果。Queue.get我在这里写了更多关于用法的信息。
使用这种方法的解决方案可能如下所示:
def worker(result_queue, batch_range):
"""worker function"""
result = []
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
result.append((x, y))
result_queue.put(result) # <--
if __name__ == '__main__':
N_WORKERS = mp.cpu_count()
X_MAX = 100000000
result_queue = mp.Queue() # <--
batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)
with context_timer():
pool = [Process(target=worker, args=(result_queue, batch_range))
for batch_range in batch_ranges]
for p in pool:
p.start()
results = [result_queue.get() for _ in batch_ranges]
for p in pool:
p.join()
print(f'results: {results}')
x, y = zip(*chain.from_iterable(results)) # filter and sort results
print(f'results sorted: x: {x}, y: {y}')
添加回答
举报