我有一个使用 Dask 编写的大型数据提取作业,其中每个任务将从数十个数据库的大量表中查询一个表。对于每个数据库实例,我想限制一次连接的任务数量(即限制)。例如,我可能有 100 个任务连接到数据库 A,100 个任务连接到数据库 B,100 个任务连接到数据库 C,等等,并且我想确保在任何给定时间连接到任何数据库的任务不超过 20 个。我发现 Dask 提供了基于工作线程资源(CPU、MEM、GPU 等)的约束,但是数据库资源是“全局”的,因此对于任何 Dask 工作线程来说都不是特定的。Dask 是否提供任何方法来对任务并发性的此类约束进行建模?
1 回答
幕布斯6054654
TA贡献1876条经验 获得超7个赞
Dask 提供分布式信号量,可以限制对数据库等资源的并发访问。
例子
import time
from dask.distributed import Client, Semaphore
client = Client(...)
def do_task(x, sem):
with sem:
time.sleep(5)
return x
# allow no more than 5 tasks to run concurrently
sem = Semaphore(max_leases=5, name="Limiter")
# submit jobs that use the semaphore
futures = client.map(do_task, range(20), sem=sem)
# collect results
results = client.gather(futures)
添加回答
举报
0/150
提交
取消