为了账号安全,请及时绑定邮箱和手机立即绑定

在 Dask 中,如何根据全局(而不是工作线程)资源约束来限制任务的调度?

在 Dask 中,如何根据全局(而不是工作线程)资源约束来限制任务的调度?

繁星coding 2023-08-15 18:37:16
我有一个使用 Dask 编写的大型数据提取作业,其中每个任务将从数十个数据库的大量表中查询一个表。对于每个数据库实例,我想限制一次连接的任务数量(即限制)。例如,我可能有 100 个任务连接到数据库 A,100 个任务连接到数据库 B,100 个任务连接到数据库 C,等等,并且我想确保在任何给定时间连接到任何数据库的任务不超过 20 个。我发现 Dask 提供了基于工作线程资源(CPU、MEM、GPU 等)的约束,但是数据库资源是“全局”的,因此对于任何 Dask 工作线程来说都不是特定的。Dask 是否提供任何方法来对任务并发性的此类约束进行建模?
查看完整描述

1 回答

?
幕布斯6054654

TA贡献1876条经验 获得超7个赞

Dask 提供分布式信号量,可以限制对数据库等资源的并发访问。

例子

import time

from dask.distributed import Client, Semaphore


client = Client(...)


def do_task(x, sem):

    with sem:

        time.sleep(5)

        return x


# allow no more than 5 tasks to run concurrently

sem = Semaphore(max_leases=5, name="Limiter")


# submit jobs that use the semaphore

futures = client.map(do_task, range(20), sem=sem)


# collect results

results = client.gather(futures)


查看完整回答
反对 回复 2023-08-15
  • 1 回答
  • 0 关注
  • 81 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信