为了账号安全,请及时绑定邮箱和手机立即绑定

在 numpy 二项式函数上设置输出矩阵

在 numpy 二项式函数上设置输出矩阵

紫衣仙女 2022-12-20 14:49:39
我想在我的项目中使用以下示例,它使用多个线程用随机值填充数组。 https://numpy.org/doc/1.18/reference/random/multithreading.html。但是,我想使用二项分布而不是标准正态分布。我的问题是numpy.random.Generator.binomial方法没有放置结果的“out”参数(如 standard_normal 方法)。这意味着我将不得不将给我的输出矩阵复制到我的矩阵中,这会大大降低性能。是否有替代方法可以解决此问题?如果这有帮助,我实际上需要伯努利分布,即二项分布中的 n=1(但任意 p)。
查看完整描述

3 回答

?
蓝山帝景

TA贡献1843条经验 获得超7个赞

以下代码可以使用支持和不支持out参数的随机生成器。虽然通常使用out参数会加快执行速度,但即使在没有并行执行的情况下,您也可以从使用并行执行中获得一些好处。


import os

import concurrent.futures

import numpy as np



def _rg_size(bit_gen, dist, num, *args, **kwargs):

    return getattr(np.random.Generator(bit_gen), dist)(size=num, *args, **kwargs)



def _rg_out(bit_gen, dist, out, *args, **kwargs):

    return getattr(np.random.Generator(bit_gen), dist)(out=out, *args, **kwargs)



def splitter(num, num_chunks):

    chunk_size = num // num_chunks + (1 if num % num_chunks else 0)

    while num > chunk_size:

        num -= chunk_size

        yield chunk_size

    yield num



def slicing_splitter(num, num_chunks):

    chunk_size = num // num_chunks + (1 if num % num_chunks else 0)

    i = 0

    remaining = num

    while remaining > chunk_size:

        remaining -= chunk_size

        yield slice(i, i + chunk_size)

        i += chunk_size

    yield slice(i, num)



def new_rgs(rg):

    while True:

        new_rg = rg.jumped()

        yield new_rg

        rg = new_rg



def glue(arrs, size, num_arrs=None):

    if num_arrs is None and hasattr(arrs, __len__):

        num_arrs = len(arrs)

    slicings = slicing_splitter(size, num_arrs)

    arrs = iter(arrs)

    arr = next(arrs)

    slicing = next(slicings)

    out = np.empty(size, dtype=arr.dtype)

    out[slicing] = arr

    for arr, slicing in zip(arrs, slicings):

        out[slicing] = arr

    return out



def parallel_rand_gen(

        num=None,

        dist='standard_normal',

        bit_gen=None,

        seed=None,

        out=None,

        num_workers=None,

        split_factor=1,

        *args,

        **kwargs):

    if num_workers is None:

        num_workers = min(32, os.cpu_count() + 1)

    if bit_gen is None:

        bit_gen = np.random.PCG64(seed)

    if out is not None:

        shape = out.shape

        out = out.ravel()

        num = out.size

    elif num is None:

        raise ValueError('Either `num` or `out` must be specified.')

    with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:

        num_splits = split_factor * num_workers

        if out is None:

            futures = [

                executor.submit(_rg_size, rg, dist, n, *args, **kwargs)

                for rg, n in zip(new_rgs(bit_gen), splitter(num, num_splits))]

            concurrent.futures.wait(futures)

            result = (future.result() for future in futures)

            # out = np.concatenate(tuple(result))  # slower alternative

            out = glue(result, num, num_splits)

        else:

            futures = [

                executor.submit(_rg_out, rg, dist, out[slicing], *args, **kwargs)

                for rg, slicing in zip(new_rgs(bit_gen), slicing_splitter(num, num_splits))]

            concurrent.futures.wait(futures)

            out = out.reshape(shape)

    return out



print(parallel_rand_gen(17))

# [ 0.96710075  2.2935126   0.35537793  0.5825714   2.14440658  0.64483092

#   0.54062617  0.44907003  0.11947266 -0.60748694 -0.52509115  0.62924905

#   0.51714721  0.29864705 -0.46105766 -0.271093    0.33055528]

为此standard_normal:


n = 10000001

%timeit parallel_rand_gen(n)

# 89.3 ms ± 1.69 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%timeit out = np.empty(n); parallel_rand_gen(out=out)

# 74.6 ms ± 1.66 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%timeit rg = np.random.Generator(np.random.PCG64()); rg.standard_normal(n)

# 181 ms ± 7.45 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

并且,对于binomial,这得到:


n = 10000001

%timeit parallel_rand_gen(n, 'binomial', n=100, p=0.5)

# 480 ms ± 15 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit rg = np.random.Generator(np.random.PCG64()); rg.binomial(100, 0.5, n)

# 1.17 s ± 35.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

(在 4 核 6 岁笔记本电脑上测试。)


查看完整回答
反对 回复 2022-12-20
?
梵蒂冈之花

TA贡献1900条经验 获得超5个赞

根据您提供的示例,我创建了代码:


from numpy.random import Generator, PCG64

import multiprocessing

import concurrent.futures

import numpy as np


# to calculate the bernoulli randomness

from scipy.stats import bernoulli


# use this to see the results

import matplotlib.pyplot as plt


#benchmark the multi threading

from time import time


class MultithreadedRNG(object):

    def __init__(self, n, seed=None, number_of_threads=None):

        rg = PCG64(seed)

        if number_of_threads is None:

            number_of_threads = multiprocessing.cpu_count()

        self.number_of_threads = number_of_threads


        self._random_generators = [rg]

        last_rg = rg

        for _ in range(0, number_of_threads-1):

            new_rg = last_rg.jumped()

            self._random_generators.append(new_rg)

            last_rg = new_rg


        self.n = n

        self.executor = concurrent.futures.ThreadPoolExecutor(number_of_threads) # use this  object to multithread

        self.value_array = np.empty(n) # reserve the array memory

        self.step = np.ceil(n / number_of_threads).astype(np.int_) # round up to get the number of steps


    def _thread_fill(self, rg, out, first, last):

        p = 0.3


        # x = np.random.randn(N_points) # this uses a normal distribution

        self.value_array[first:last] = bernoulli.rvs(p, size=len(out[first:last]))

        #self.value_array[first:last] = np.random.standard_normal(len(out[first:last]))


    def fill(self):


        futures = {}

        for i in range(self.number_of_threads):

            args = (

                self._thread_fill,

                self._random_generators[i],

                self.value_array,

                i * self.step,

                (i + 1) * self.step

                )


            # this is a simple object to signal is complete

            futures[self.executor.submit(*args)] = i


        # wait for all the proccess to finish

        concurrent.futures.wait(futures)


    def __del__(self):

        self.executor.shutdown(False)


if __name__ == "__main__":


    arr_size = 1000000


    # populate using multi thread

    mrng = MultithreadedRNG(arr_size, seed=0)

    multi_thread_time1 = time()

    mrng.fill()

    mrng.__del__()

    print("Multi thread time: ", time() - multi_thread_time1)


    # populate using single thread

    single_thread_time1 = time()

    vec = np.random.standard_normal(arr_size)

    print("Single thread time: ", time() - single_thread_time1)


    # see the results

    print("Results: ", mrng.value_array)

    fig, axs = plt.subplots(2, sharex=False)

    axs[0].hist(vec, bins=30)

    axs[0].set_title('Standard distribution')

    axs[1].hist(mrng.value_array, bins=30)

    axs[1].set_title('Bernouli distribution')

    fig.tight_layout()

    plt.show()

然后您可以更改伯努利分布的“p”值。运行示例如下所示:

//img1.sycdn.imooc.com//63a15b420001b14e06330465.jpg

查看完整回答
反对 回复 2022-12-20
?
GCT1015

TA贡献1827条经验 获得超4个赞

out如果你想在numpy.random.Generator.random中绘制统一的浮点数,则有一个参数[0.0,1.0)

然后你可以只使用:

def bernoulli(shape, p):
    U = uniform(shape) # can use multithreading
    return U < p       # should be fast enough


查看完整回答
反对 回复 2022-12-20
  • 3 回答
  • 0 关注
  • 96 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信