就我而言,我在 S3 中有多个文件和一个自定义函数,该函数读取每个文件并使用所有线程处理它。为了简化示例,我只生成一个数据帧df,并且假设我的函数是tsfresh.extract_features使用多重处理。生成数据import pandas as pdfrom tsfresh import extract_featuresfrom tsfresh.examples.robot_execution_failures import download_robot_execution_failures, \load_robot_execution_failuresdownload_robot_execution_failures()ts, y = load_robot_execution_failures()df = []for i in range(5): tts = ts.copy() tts["id"] += 88 * i df.append(tts) df = pd.concat(df, ignore_index=True)功能def fun(df, n_jobs): extracted_features = extract_features(df, column_id="id", column_sort="time", n_jobs=n_jobs)簇import daskfrom dask.distributed import Client, progressfrom dask import compute, delayedfrom dask_cloudprovider import FargateClustermy_vpc = # your vpcmy_subnets = # your subnetscpu = 2 ram = 4cluster = FargateCluster(n_workers=1, image='rpanai/feats-worker:2020-08-24', vpc=my_vpc, subnets=my_subnets, worker_cpu=int(cpu * 1024), worker_mem=int(ram * 1024), cloudwatch_logs_group="my_log_group", task_role_policies=['arn:aws:iam::aws:policy/AmazonS3FullAccess'], scheduler_timeout='20 minutes' )cluster.adapt(minimum=1, maximum=4)client = Client(cluster)client使用所有工作线程(失败)to_process = [delayed(fun)(df, cpu) for i in range(10)]out = compute(to_process)AssertionError: daemonic processes are not allowed to have children仅使用一个线程(OK)在这种情况下,它工作正常,但我浪费资源。to_process = [delayed(fun)(df, 0) for i in range(10)]out = compute(to_process)问题我知道对于这个特定的功能,我最终可以使用多线程和其他一些技巧编写一个自定义分配器,但我想分配一个工作,让每个工作人员都可以利用所有资源,而不必担心太多。
1 回答
元芳怎么了
TA贡献1798条经验 获得超7个赞
我可以帮助回答您的具体问题tsfresh
,但 iftsfresh
只是一个简单的玩具示例,可能不是您想要的。
对于tsfresh
,您通常不会混合使用tsfresh
dask 和 dask 的多重处理,而是让 dask 执行所有处理。这意味着,您从一个单一的开始dask.DataFrame
(在您的测试用例中,您可以将 pandas 数据帧转换为 dask 数据帧 - 对于您的读取用例,您可以直接从S3
docu读取),然后在 dask 数据帧中分发特征提取(特征提取的好处是,它在每个时间序列上独立工作。因此我们可以为每个时间序列生成一个作业)。
我不确定这是否有助于解决您更普遍的问题。在我看来,你(在大多数情况下)不想混合dask的分布函数和“本地”多核计算,而只是让dask处理一切。因为如果您位于 dask 集群上,您甚至可能不知道每台机器上有多少个核心(或者每个作业可能只获得一个核心)。
这意味着,如果您的作业可以分发 N 次,并且每个作业将启动 M 个子作业,您只需将“N x M”作业交给 dask 并让它计算其余部分(包括数据局部性)。
添加回答
举报
0/150
提交
取消