2 回答
TA贡献1824条经验 获得超6个赞
python线程不会真正帮助你并行处理,因为它们是在同一个“真正的CPU线程”上执行的,python线程在你处理异步HTTP调用时很有帮助。
关于来自文档:ProcessPoolExecutor
concurrent.futures.ProcessPoolExecutor()
ProcessPoolExecutor 类是一个执行器子类,它使用进程池异步执行调用。ProcessPoolExecutor使用多处理模块,这允许它避开全局解释器锁,但也意味着只能执行和返回可拾取的对象。
如果您需要高CPU处理,它可以为您提供帮助,您可以使用:
import concurrent
def manipulate_values(k_v):
k, v = k_v
return_values = []
for i in v :
new_value = i ** 2 - 13
return_values.append(new_value)
return k, return_values
with concurrent.futures.ProcessPoolExecutor() as executor:
example_dict = dict(executor.map(manipulate_values, example_dict1.items()))
这是一个简单的基准测试,使用一个简单的循环来处理你的数据,而不是使用,我的场景假设对于要处理的每个项目,你需要大约50ms的CPU时间:forProcessPoolExecutor
您可以看到如果要处理的每个项目的CPU时间高的真正好处ProcessPoolExecutor
from simple_benchmark import BenchmarkBuilder
import time
import concurrent
b = BenchmarkBuilder()
def manipulate_values1(k_v):
k, v = k_v
time.sleep(0.05)
return k, v
def manipulate_values2(v):
time.sleep(0.05)
return v
@b.add_function()
def test_with_process_pool_executor(d):
with concurrent.futures.ProcessPoolExecutor() as executor:
return dict(executor.map(manipulate_values1, d.items()))
@b.add_function()
def test_simple_for_loop(d):
for key, value in d.items():
d[key] = manipulate_values2((key, value))
@b.add_arguments('Number of keys in dict')
def argument_provider():
for exp in range(2, 10):
size = 2**exp
yield size, {i: [i] * 10_000 for i in range(size)}
r = b.run()
r.plot()
如果您没有为 ProcessPoolExecutor 设置工作线程数,则默认的工作线程数将等于计算机上的处理器数(对于基准测试,我使用的是一台 CPU 为 8 的电脑)。
但在您的情况下,根据问题中提供的数据,处理1个项目将需要约3 μs:
%timeit manipulate_values([367, 30, 847, 482, 887, 654, 347, 504, 413, 821])
2.32 µs ± 25.8 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
因此,如果要处理的一个项目的 CPU 时间较短,则最好使用简单的 for 循环。
@user3666197提出的一个很好的观点是,当你有巨大的项目/列表时,我使用列表中的随机数对这两种方法进行了基准测试:1_000_000_000
如您所见,在这种情况下更适合使用ProcessPoolExecutor
from simple_benchmark import BenchmarkBuilder
import time
import concurrent
from random import choice
b = BenchmarkBuilder()
def manipulate_values1(k_v):
k, v = k_v
return_values = []
for i in v:
new_value = i ** 2 - 13
return_values.append(new_value)
return k, return_values
def manipulate_values2(v):
return_values = []
for i in v:
new_value = i ** 2 - 13
return_values.append(new_value)
return return_values
@b.add_function()
def test_with_process_pool_executor(d):
with concurrent.futures.ProcessPoolExecutor() as executor:
return dict(executor.map(manipulate_values1, d.items()))
@b.add_function()
def test_simple_for_loop(d):
for key, value in d.items():
d[key] = manipulate_values2(value)
@b.add_arguments('Number of keys in dict')
def argument_provider():
for exp in range(2, 5):
size = 2**exp
yield size, {i: [choice(range(1000)) for _ in range(1_000_000)] for i in range(size)}
r = b.run()
r.plot()
预期,因为处理一个项目需要大约209ms:
l = [367] * 1_000_000
%timeit manipulate_values2(l)
# 209 ms ± 1.45 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
不过,最快的选择是将numpy.arrays与循环解决方案一起使用:for
from simple_benchmark import BenchmarkBuilder
import time
import concurrent
import numpy as np
b = BenchmarkBuilder()
def manipulate_values1(k_v):
k, v = k_v
return k, v ** 2 - 13
def manipulate_values2(v):
return v ** 2 - 13
@b.add_function()
def test_with_process_pool_executor(d):
with concurrent.futures.ProcessPoolExecutor() as executor:
return dict(executor.map(manipulate_values1, d.items()))
@b.add_function()
def test_simple_for_loop(d):
for key, value in d.items():
d[key] = manipulate_values2(value)
@b.add_arguments('Number of keys in dict')
def argument_provider():
for exp in range(2, 7):
size = 2**exp
yield size, {i: np.random.randint(0, 1000, size=1_000_000) for i in range(size)}
r = b.run()
r.plot()
预计简单循环会更快,因为处理一个numpy.array需要<1ms:for
def manipulate_value2( input_list ):
return input_list ** 2 - 13
l = np.random.randint(0, 1000, size=1_000_000)
%timeit manipulate_values2(l)
# 951 µs ± 5.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
TA贡献1772条经验 获得超5个赞
问:“为什么我不能使用多个线程来执行此计算,例如三个线程,一个用于 key1、key2 和 key3?”
你可以,但对性能没有合理的影响 - 了解python如何处理基于线程的执行流的所有细节在这里是基本的。了解 GIL 锁定技巧,正确使用它,避免任何并发处理及其对性能的影响,您将获得 WHY 部分。
Q : “concurrent.futures.ProcessPoolExecutor() 在这里工作吗?”
愿意。
然而,它们的净效应(如果有任何比纯处理流“更快”)将取决于给定大小的“大”列表(如上文所警告的那样,“数百万个密钥,并且列表同样长”),这些列表应该被复制(RAM-I/ O)并传递(SER /DES处理+IPC传输)到生成的(基于进程的)远程执行器池。[SERIAL]
这些多次重复的RAM-I/O + SER/DES附加开销成本将很快占据主导地位。
RAM-I/O 复制步骤:
>>> from zmq import Stopwatch; aClk = Stopwatch()
>>> aClk.start(); aList = [ i for i in range( int( 1E4 ) ) ]; aClk.stop()
1345 [us] to copy a List of 1E4 elements
>>> aClk.start(); aList = [ i for i in range( int( 1E5 ) ) ]; aClk.stop()
12776 [us] to copy a List of 1E5 elements
>>> aClk.start(); aList = [ i for i in range( int( 1E6 ) ) ]; aClk.stop()
149197 [us] to copy a List of 1E6 elements
>>> aClk.start(); aList = [ i for i in range( int( 1E7 ) ) ]; aClk.stop()
1253792 [us] to copy a List of 1E7 elements
| |::: [us]
| +--- [ms]
+------ [ s]
SER/DES 步骤 :
>>> import pickle
>>> aClk.start(); _ = pickle.dumps( aList ); aClk.stop()
608323
615851
638821 [us] to copy pickle.dumps() a List of 1E7 elements
| |::: [us]
| +--- [ms]
+------ [ s]
因此,每个批次的附加开销预期为 ~ 2 x ( 1253 + 608 ) [ms] + IPC 传输成本,只需一次 1E7 个项目
manipulate_values() 的实际有用工作有效负载非常小,以至于所有附加成本的一次性总和几乎无法支付与在远程工作人员池中分配工作单元相关的额外费用。矢量化计算形式有望带来更智能的结果。这里的附加成本比少量的有用工作要大得多。
模式将更多地取决于SER/DES参数通过“那里”的开销成本,加上SER/DES返回结果的附加成本 - 所有这些都将决定净效应(<<1.0 x的反加速经常在用例中观察到,但引入只是一个糟糕的设计端工程实践, 没有后期基准可以挽救已经烧毁的人*天,浪费在如此糟糕的设计决策中)
添加回答
举报