1 回答
TA贡献1875条经验 获得超5个赞
正如在问题中提到的,该程序对于足够小的数据集运行得很好(尽管它似乎绕过了 Ray 逻辑的几个方面),但它最终在大型数据集上崩溃了。仅使用 Ray 任务,我没有设法调用存储在 Object Store ( ValueError: buffer source array is read-only) 中的 Scipy Interpolator 对象,并且装饰所有函数没有意义,因为实际上只有主要函数应该同时执行(同时调用其他函数)。
因此,我决定更改程序结构以使用 Ray Actors。设置说明现在是该__init__方法的一部分。特别是,Scipy Interpolator 对象在此方法中定义并设置为 的属性self,就像全局变量一样。大多数函数(包括 main 函数)已成为类方法,但通过 Numba 编译的函数除外。对于后者,它们仍然是用 装饰的独立函数@jit,但它们中的每一个现在在调用 jitted 函数的类中都有一个等效的包装方法。
为了让我的程序并行执行我现在的主要方法,我依赖于 ActorPool。我创建了与可用 CPU 一样多的 actor,每个 actor 都执行 main 方法,成功调用方法和 Numba 编译的函数,同时还设法访问 Interpolator 对象。我只适用@ray.remote于定义的 Python 类。所有这些都转化为以下结构:
@ray.remote
class FooClass(object):
def __init__(self, initArgs):
# Initialisation
@staticmethod
def exampleStaticMethod(args):
# Processing
return
def exampleMethod(self, args):
# Processing
return
def exampleWrapperMethod(self, args):
return numbaCompiledFunction(args)
def mainMethod(self, poolMapArgs):
# Processing
return
@jit
def numbaCompiledFunction(args):
# Processing
return
ray.init(address='auto', redis_password=redPass)
actors = []
for actor in range(int(ray.cluster_resources()['CPU'])):
actors.append(FooClass.remote(initArgs))
pool = ActorPool(actors)
for unpackedTuple in pool.map_unordered(
lambda a, v: a.mainMethod.remote(v),
poolMapArgs):
# Processing
ray.shutdown()
这在分布在 4 个节点上的 192 个 CPU 上成功运行,没有任何警告或错误。
添加回答
举报