为了账号安全,请及时绑定邮箱和手机立即绑定

如何在列表的“大数据”字典上进行并行计算?

如何在列表的“大数据”字典上进行并行计算?

犯罪嫌疑人X 2022-08-25 14:47:20
我有一个关于在python字典上进行计算的问题----在这种情况下,字典有数百万个键,列表同样很长。似乎存在分歧,是否可以在这里使用并行化,所以我将在这里更明确地提出这个问题。这是最初的问题:优化解析海量Python字典,多线程这是一个玩具(小)python字典:example_dict1 = {'key1':[367, 30, 847, 482, 887, 654, 347, 504, 413, 821],    'key2':[754, 915, 622, 149, 279, 192, 312, 203, 742, 846],     'key3':[586, 521, 470, 476, 693, 426, 746, 733, 528, 565]}假设我需要解析列表的值,我已将其实现为以下简单(玩具)函数:def manipulate_values(input_list):    return_values = []    for i in input_list:        new_value = i ** 2 - 13        return_values.append(new_value)    return return_values现在,我可以轻松解析此字典的值,如下所示:for key, value in example_dict1.items():    example_dict1[key] = manipulate_values(value)导致以下情况:example_dict1 = {'key1': [134676, 887, 717396, 232311, 786756, 427703, 120396, 254003, 170556, 674028],      'key2': [568503, 837212, 386871, 22188, 77828, 36851, 97331, 41196, 550551, 715703],      'key3': [343383, 271428, 220887, 226563, 480236, 181463, 556503, 537276, 278771, 319212]}问:为什么我不能使用多个线程来执行此计算,例如三个线程,一个用于 、和 ?会在这里工作吗?key1key2key3concurrent.futures.ProcessPoolExecutor()原始问题:有没有更好的方法来优化这种快速获取?
查看完整描述

2 回答

?
慕妹3242003

TA贡献1824条经验 获得超6个赞

python线程不会真正帮助你并行处理,因为它们是在同一个“真正的CPU线程”上执行的,python线程在你处理异步HTTP调用时很有帮助。

关于来自文档ProcessPoolExecutor

concurrent.futures.ProcessPoolExecutor()

ProcessPoolExecutor 类是一个执行器子类,它使用进程池异步执行调用。ProcessPoolExecutor使用多处理模块,这允许它避开全局解释器锁,但也意味着只能执行和返回可拾取的对象。

如果您需要高CPU处理,它可以为您提供帮助,您可以使用:

import concurrent



def manipulate_values(k_v):

    k, v = k_v

    return_values = []

    for i in v :

        new_value = i ** 2 - 13

        return_values.append(new_value)

    return k, return_values



with concurrent.futures.ProcessPoolExecutor() as executor:

        example_dict = dict(executor.map(manipulate_values, example_dict1.items()))

这是一个简单的基准测试,使用一个简单的循环来处理你的数据,而不是使用,我的场景假设对于要处理的每个项目,你需要大约50ms的CPU时间:forProcessPoolExecutor

您可以看到如果要处理的每个项目的CPU时间高的真正好处ProcessPoolExecutor


from simple_benchmark import BenchmarkBuilder

import time

import concurrent


b = BenchmarkBuilder()


def manipulate_values1(k_v):

    k, v = k_v

    time.sleep(0.05)

    return k, v


def manipulate_values2(v):

    time.sleep(0.05)

    return v


@b.add_function()

def test_with_process_pool_executor(d):

    with concurrent.futures.ProcessPoolExecutor() as executor:

            return dict(executor.map(manipulate_values1, d.items()))


@b.add_function()       

def test_simple_for_loop(d):

    for key, value in d.items():

        d[key] = manipulate_values2((key, value))



@b.add_arguments('Number of keys in dict')

def argument_provider():

    for exp in range(2, 10):

        size = 2**exp

        yield size, {i: [i] * 10_000 for i in range(size)}


r = b.run()

r.plot()

如果您没有为 ProcessPoolExecutor 设置工作线程数,则默认的工作线程数将等于计算机上的处理器数(对于基准测试,我使用的是一台 CPU 为 8 的电脑)。


但在您的情况下,根据问题中提供的数据,处理1个项目将需要约3 μs:


%timeit manipulate_values([367, 30, 847, 482, 887, 654, 347, 504, 413, 821])

2.32 µs ± 25.8 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

在这种情况下,基准测试将如下所示:在此输入图像描述

因此,如果要处理的一个项目的 CPU 时间较短,则最好使用简单的 for 循环。


@user3666197提出的一个很好的观点是,当你有巨大的项目/列表时,我使用列表中的随机数对这两种方法进行了基准测试:1_000_000_000

在此输入图像描述

如您所见,在这种情况下更适合使用ProcessPoolExecutor

from simple_benchmark import BenchmarkBuilder

import time

import concurrent

from random import choice


b = BenchmarkBuilder()


def manipulate_values1(k_v):

    k, v = k_v

    return_values = []

    for i in v:

        new_value = i ** 2 - 13

        return_values.append(new_value)


    return k, return_values


def manipulate_values2(v):

    return_values = []

    for i in v:

        new_value = i ** 2 - 13

        return_values.append(new_value)

    return return_values


@b.add_function()

def test_with_process_pool_executor(d):

    with concurrent.futures.ProcessPoolExecutor() as executor:

            return dict(executor.map(manipulate_values1, d.items()))


@b.add_function()       

def test_simple_for_loop(d):

    for key, value in d.items():

        d[key] = manipulate_values2(value)



@b.add_arguments('Number of keys in dict')

def argument_provider():

    for exp in range(2, 5):

        size = 2**exp

        yield size, {i: [choice(range(1000)) for _ in range(1_000_000)] for i in range(size)}


r = b.run()

r.plot()

预期,因为处理一个项目需要大约209ms:


l = [367] * 1_000_000

%timeit manipulate_values2(l)

# 209 ms ± 1.45 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

不过,最快的选择是将numpy.arrays与循环解决方案一起使用:for

from simple_benchmark import BenchmarkBuilder

import time

import concurrent

import numpy as np


b = BenchmarkBuilder()


def manipulate_values1(k_v):

    k, v = k_v

    return k,  v ** 2 - 13


def manipulate_values2(v):

    return v ** 2 - 13


@b.add_function()

def test_with_process_pool_executor(d):

    with concurrent.futures.ProcessPoolExecutor() as executor:

            return dict(executor.map(manipulate_values1, d.items()))


@b.add_function()       

def test_simple_for_loop(d):

    for key, value in d.items():

        d[key] = manipulate_values2(value)



@b.add_arguments('Number of keys in dict')

def argument_provider():

    for exp in range(2, 7):

        size = 2**exp

        yield size, {i: np.random.randint(0, 1000, size=1_000_000) for i in range(size)}


r = b.run()

r.plot()

预计简单循环会更快,因为处理一个numpy.array需要<1ms:for


def manipulate_value2( input_list ):

    return input_list ** 2 - 13


l = np.random.randint(0, 1000, size=1_000_000)

%timeit manipulate_values2(l)

# 951 µs ± 5.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


查看完整回答
反对 回复 2022-08-25
?
月关宝盒

TA贡献1772条经验 获得超5个赞

问:“为什么我不能使用多个线程来执行此计算,例如三个线程,一个用于 key1、key2 和 key3?”


你可以,但对性能没有合理的影响 - 了解python如何处理基于线程的执行流的所有细节在这里是基本的。了解 GIL 锁定技巧,正确使用它,避免任何并发处理及其对性能的影响,您将获得 WHY 部分。


Q : “concurrent.futures.ProcessPoolExecutor() 在这里工作吗?”


愿意。


然而,它们的净效应(如果有任何比纯处理流“更快”)将取决于给定大小的“大”列表(如上文所警告的那样,“数百万个密钥,并且列表同样长”),这些列表应该被复制(RAM-I/ O)并传递(SER /DES处理+IPC传输)到生成的(基于进程的)远程执行器池。[SERIAL]


这些多次重复的RAM-I/O + SER/DES附加开销成本将很快占据主导地位。


RAM-I/O 复制步骤:


>>> from zmq import Stopwatch; aClk = Stopwatch()


>>> aClk.start(); aList = [ i for i in range( int( 1E4 ) ) ]; aClk.stop()

   1345 [us] to copy a List of 1E4 elements

>>> aClk.start(); aList = [ i for i in range( int( 1E5 ) ) ]; aClk.stop()

  12776 [us] to copy a List of 1E5 elements

>>> aClk.start(); aList = [ i for i in range( int( 1E6 ) ) ]; aClk.stop()

 149197 [us] to copy a List of 1E6 elements

>>> aClk.start(); aList = [ i for i in range( int( 1E7 ) ) ]; aClk.stop()

1253792 [us] to copy a List of 1E7 elements

|  |::: [us]

|  +--- [ms]

+------ [ s]

SER/DES 步骤 :


>>> import pickle

>>> aClk.start(); _ = pickle.dumps( aList ); aClk.stop()

 608323 

 615851

 638821 [us] to copy pickle.dumps() a List of 1E7 elements

|  |::: [us]

|  +--- [ms]

+------ [ s]

因此,每个批次的附加开销预期为 ~ 2 x ( 1253 + 608 ) [ms] + IPC 传输成本,只需一次 1E7 个项目


manipulate_values() 的实际有用工作有效负载非常小,以至于所有附加成本的一次性总和几乎无法支付与在远程工作人员池中分配工作单元相关的额外费用。矢量化计算形式有望带来更智能的结果。这里的附加成本比少量的有用工作要大得多。


模式将更多地取决于SER/DES参数通过“那里”的开销成本,加上SER/DES返回结果的附加成本 - 所有这些都将决定净效应(<<1.0 x的反加速经常在用例中观察到,但引入只是一个糟糕的设计端工程实践, 没有后期基准可以挽救已经烧毁的人*天,浪费在如此糟糕的设计决策中)


查看完整回答
反对 回复 2022-08-25
  • 2 回答
  • 0 关注
  • 79 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信