假设我想要一个队列,其中任何时候只异步处理 3 个项目,我该怎么做?这就是我的意思:如果我有一组项目要上传到后端,即将一些人工制品上传到云存储,然后创建/更新一个文档以反映每个人工制品的 url,我不想:在下一次之前异步/等待每个上传操作 - 因为这会很慢同时发送所有内容 - 这可能导致写入热点或速率限制做一个 promise.race - 这最终导致(2)做一个 promise.all - 如果有一个长时间运行的上传,这个过程会变慢。我想做的是:有一个所有上传的队列,比如使用 RxJs 创建方法,例如from(array-of-upload-items)在任何时候处理 3 个项目的堆栈。当一个项目离开堆栈即完成时,我们将一个新项目添加到队列中确保在任何一点,堆栈中始终有 3 个项目正在处理,直到队列中没有更多项目等待放入堆栈。我将如何使用 RxJs 来解决这个问题?编辑:2020 年 6 月 27 日这就是我的想法:const rxQueue = from(filesArray) // this is the list of files to upload say like 25 files or so rxQueue .pipe( mergeMap((item) => of(item).pipe( tap(async (item) => { await Promise.race([ processUpload(item[0]), processUpload(item[1]), processUpload(item[2]), ]) }), ), 3 ), ) .subscribe()目标是确保在任何时候都处理(上传)3 个文件,以至于如果一个文件上传过程结束,则添加另一个文件以将堆栈保持在 3 个上传过程中。同理,如果 2 个文件上传同时结束,则将 2 个新文件添加到堆栈中,依此类推,直到文件数组中的所有文件都上传完毕。
2 回答
ABOUTYOU
TA贡献1812条经验 获得超5个赞
我想你可以试试这个:
from(filesArray)
.pipe(
mergeMap(file => service.uploadFile(file), 3)
)
这假设service.uploadFile返回一个承诺或一个可观察的。
假设您有 5 个文件,那么将从前 3 个文件创建 3 个可观察对象,当其中一个完成时,将获取第 4 个文件并从中创建一个新的可观察对象,依此类推。
偶然的你
TA贡献1841条经验 获得超3个赞
用作Subject队列,并且mergeMap有一个并发参数,您可以限制最大并发数
const queue=new Subject()
queque.asObservable().pipe(mergeMap(item=>httpCall(item),3)
queue.next(item)
添加回答
举报
0/150
提交
取消