为了账号安全,请及时绑定邮箱和手机立即绑定

在 RxJs 中实现具有固定堆栈的排队系统

在 RxJs 中实现具有固定堆栈的排队系统

慕标5832272 2022-10-21 15:01:44
假设我想要一个队列,其中任何时候只异步处理 3 个项目,我该怎么做?这就是我的意思:如果我有一组项目要上传到后端,即将一些人工制品上传到云存储,然后创建/更新一个文档以反映每个人工制品的 url,我不想:在下一次之前异步/等待每个上传操作 - 因为这会很慢同时发送所有内容 - 这可能导致写入热点或速率限制做一个 promise.race - 这最终导致(2)做一个 promise.all - 如果有一个长时间运行的上传,这个过程会变慢。我想做的是:有一个所有上传的队列,比如使用 RxJs 创建方法,例如from(array-of-upload-items)在任何时候处理 3 个项目的堆栈。当一个项目离开堆栈即完成时,我们将一个新项目添加到队列中确保在任何一点,堆栈中始终有 3 个项目正在处理,直到队列中没有更多项目等待放入堆栈。我将如何使用 RxJs 来解决这个问题?编辑:2020 年 6 月 27 日这就是我的想法:const rxQueue = from(filesArray) // this is the list of files to upload say like 25 files or so      rxQueue        .pipe(          mergeMap((item) =>            of(item).pipe(              tap(async (item) => {                  await Promise.race([                      processUpload(item[0]),                      processUpload(item[1]),                      processUpload(item[2]),                  ])              }),            ),            3          ),        )        .subscribe()目标是确保在任何时候都处理(上传)3 个文件,以至于如果一个文件上传过程结束,则添加另一个文件以将堆栈保持在 3 个上传过程中。同理,如果 2 个文件上传同时结束,则将 2 个新文件添加到堆栈中,依此类推,直到文件数组中的所有文件都上传完毕。
查看完整描述

2 回答

?
ABOUTYOU

TA贡献1812条经验 获得超5个赞

我想你可以试试这个:


from(filesArray)

  .pipe(

    mergeMap(file => service.uploadFile(file), 3)

  )

这假设service.uploadFile返回一个承诺或一个可观察的。


假设您有 5 个文件,那么将从前 3 个文件创建 3 个可观察对象,当其中一个完成时,将获取第 4 个文件并从中创建一个新的可观察对象,依此类推。


查看完整回答
反对 回复 2022-10-21
?
偶然的你

TA贡献1841条经验 获得超3个赞

用作Subject队列,并且mergeMap有一个并发参数,您可以限制最大并发数


const queue=new Subject()

queque.asObservable().pipe(mergeMap(item=>httpCall(item),3)

queue.next(item)


查看完整回答
反对 回复 2022-10-21
  • 2 回答
  • 0 关注
  • 107 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信