3 回答
TA贡献1843条经验 获得超7个赞
以下代码可以使用支持和不支持out参数的随机生成器。虽然通常使用out参数会加快执行速度,但即使在没有并行执行的情况下,您也可以从使用并行执行中获得一些好处。
import os
import concurrent.futures
import numpy as np
def _rg_size(bit_gen, dist, num, *args, **kwargs):
return getattr(np.random.Generator(bit_gen), dist)(size=num, *args, **kwargs)
def _rg_out(bit_gen, dist, out, *args, **kwargs):
return getattr(np.random.Generator(bit_gen), dist)(out=out, *args, **kwargs)
def splitter(num, num_chunks):
chunk_size = num // num_chunks + (1 if num % num_chunks else 0)
while num > chunk_size:
num -= chunk_size
yield chunk_size
yield num
def slicing_splitter(num, num_chunks):
chunk_size = num // num_chunks + (1 if num % num_chunks else 0)
i = 0
remaining = num
while remaining > chunk_size:
remaining -= chunk_size
yield slice(i, i + chunk_size)
i += chunk_size
yield slice(i, num)
def new_rgs(rg):
while True:
new_rg = rg.jumped()
yield new_rg
rg = new_rg
def glue(arrs, size, num_arrs=None):
if num_arrs is None and hasattr(arrs, __len__):
num_arrs = len(arrs)
slicings = slicing_splitter(size, num_arrs)
arrs = iter(arrs)
arr = next(arrs)
slicing = next(slicings)
out = np.empty(size, dtype=arr.dtype)
out[slicing] = arr
for arr, slicing in zip(arrs, slicings):
out[slicing] = arr
return out
def parallel_rand_gen(
num=None,
dist='standard_normal',
bit_gen=None,
seed=None,
out=None,
num_workers=None,
split_factor=1,
*args,
**kwargs):
if num_workers is None:
num_workers = min(32, os.cpu_count() + 1)
if bit_gen is None:
bit_gen = np.random.PCG64(seed)
if out is not None:
shape = out.shape
out = out.ravel()
num = out.size
elif num is None:
raise ValueError('Either `num` or `out` must be specified.')
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
num_splits = split_factor * num_workers
if out is None:
futures = [
executor.submit(_rg_size, rg, dist, n, *args, **kwargs)
for rg, n in zip(new_rgs(bit_gen), splitter(num, num_splits))]
concurrent.futures.wait(futures)
result = (future.result() for future in futures)
# out = np.concatenate(tuple(result)) # slower alternative
out = glue(result, num, num_splits)
else:
futures = [
executor.submit(_rg_out, rg, dist, out[slicing], *args, **kwargs)
for rg, slicing in zip(new_rgs(bit_gen), slicing_splitter(num, num_splits))]
concurrent.futures.wait(futures)
out = out.reshape(shape)
return out
print(parallel_rand_gen(17))
# [ 0.96710075 2.2935126 0.35537793 0.5825714 2.14440658 0.64483092
# 0.54062617 0.44907003 0.11947266 -0.60748694 -0.52509115 0.62924905
# 0.51714721 0.29864705 -0.46105766 -0.271093 0.33055528]
为此standard_normal:
n = 10000001
%timeit parallel_rand_gen(n)
# 89.3 ms ± 1.69 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit out = np.empty(n); parallel_rand_gen(out=out)
# 74.6 ms ± 1.66 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit rg = np.random.Generator(np.random.PCG64()); rg.standard_normal(n)
# 181 ms ± 7.45 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
并且,对于binomial,这得到:
n = 10000001
%timeit parallel_rand_gen(n, 'binomial', n=100, p=0.5)
# 480 ms ± 15 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit rg = np.random.Generator(np.random.PCG64()); rg.binomial(100, 0.5, n)
# 1.17 s ± 35.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
(在 4 核 6 岁笔记本电脑上测试。)
TA贡献1900条经验 获得超5个赞
根据您提供的示例,我创建了代码:
from numpy.random import Generator, PCG64
import multiprocessing
import concurrent.futures
import numpy as np
# to calculate the bernoulli randomness
from scipy.stats import bernoulli
# use this to see the results
import matplotlib.pyplot as plt
#benchmark the multi threading
from time import time
class MultithreadedRNG(object):
def __init__(self, n, seed=None, number_of_threads=None):
rg = PCG64(seed)
if number_of_threads is None:
number_of_threads = multiprocessing.cpu_count()
self.number_of_threads = number_of_threads
self._random_generators = [rg]
last_rg = rg
for _ in range(0, number_of_threads-1):
new_rg = last_rg.jumped()
self._random_generators.append(new_rg)
last_rg = new_rg
self.n = n
self.executor = concurrent.futures.ThreadPoolExecutor(number_of_threads) # use this object to multithread
self.value_array = np.empty(n) # reserve the array memory
self.step = np.ceil(n / number_of_threads).astype(np.int_) # round up to get the number of steps
def _thread_fill(self, rg, out, first, last):
p = 0.3
# x = np.random.randn(N_points) # this uses a normal distribution
self.value_array[first:last] = bernoulli.rvs(p, size=len(out[first:last]))
#self.value_array[first:last] = np.random.standard_normal(len(out[first:last]))
def fill(self):
futures = {}
for i in range(self.number_of_threads):
args = (
self._thread_fill,
self._random_generators[i],
self.value_array,
i * self.step,
(i + 1) * self.step
)
# this is a simple object to signal is complete
futures[self.executor.submit(*args)] = i
# wait for all the proccess to finish
concurrent.futures.wait(futures)
def __del__(self):
self.executor.shutdown(False)
if __name__ == "__main__":
arr_size = 1000000
# populate using multi thread
mrng = MultithreadedRNG(arr_size, seed=0)
multi_thread_time1 = time()
mrng.fill()
mrng.__del__()
print("Multi thread time: ", time() - multi_thread_time1)
# populate using single thread
single_thread_time1 = time()
vec = np.random.standard_normal(arr_size)
print("Single thread time: ", time() - single_thread_time1)
# see the results
print("Results: ", mrng.value_array)
fig, axs = plt.subplots(2, sharex=False)
axs[0].hist(vec, bins=30)
axs[0].set_title('Standard distribution')
axs[1].hist(mrng.value_array, bins=30)
axs[1].set_title('Bernouli distribution')
fig.tight_layout()
plt.show()
然后您可以更改伯努利分布的“p”值。运行示例如下所示:
TA贡献1827条经验 获得超4个赞
out
如果你想在numpy.random.Generator.random中绘制统一的浮点数,则有一个参数[0.0,1.0)
。
然后你可以只使用:
def bernoulli(shape, p): U = uniform(shape) # can use multithreading return U < p # should be fast enough
添加回答
举报