为了账号安全,请及时绑定邮箱和手机立即绑定

继续在 RxJs 中使用 mergeMap 可管道化的错误

继续在 RxJs 中使用 mergeMap 可管道化的错误

慕村225694 2021-11-12 15:27:04
我正在用 RxJs 管道和 mergeMap 操作符做一些并行的 HTTP get。在第一个请求失败时(让我们想象 /urlnotexists 抛出 404 错误)它会停止所有其他请求。我希望它继续查询所有剩余的 url,而不为这个失败的请求调用所有剩余的 mergeMap。我尝试使用来自 RxJs 的 throwError 和 catchError 但没有成功。索引.jsconst { from } = require('rxjs');const { mergeMap, scan } = require('rxjs/operators');const request = {  get: url => {    return new Promise((resolve, reject) => {      setTimeout(() => {        if (url === '/urlnotexists') { return reject(new Error(url)); }        return resolve(url);      }, 1000);    });  }};(async function() {  await from([    '/urlexists',    '/urlnotexists',    '/urlexists2',    '/urlexists3',  ])    .pipe(      mergeMap(async url => {        try {          console.log('mergeMap 1:', url);          const val = await request.get(url);          return val;        } catch(err) {          console.log('err:', err.message);          // a throw here prevent all remaining request.get() to be tried        }      }),      mergeMap(async val => {        // should not pass here if previous request.get() failed         console.log('mergeMap 2:', val);        return val;      }),      scan((acc, val) => {        // should not pass here if previous request.get() failed         acc.push(val);        return acc;      }, []),    )    .toPromise()    .then(merged => {      // should have merged /urlexists, /urlexists2 and /urlexists3      // even if /urlnotexists failed      console.log('merged:', merged);    })    .catch(err => {      console.log('catched err:', err);    });})();$ node index.jsmergeMap 1: /urlexistsmergeMap 1: /urlnotexistsmergeMap 1: /urlexists2mergeMap 1: /urlexists3err: /urlnotexistsmergeMap 2: /urlexistsmergeMap 2: undefined <- I didn't wanted this mergeMap to have been calledmergeMap 2: /urlexists2mergeMap 2: /urlexists3merged: [ '/urlexists', undefined, '/urlexists2', '/urlexists3' ]我希望在最后发出并发 GET 请求并减少它们各自在一个对象中的值。但是如果发生一些错误,我希望他们不要中断我的管道,而是记录它们。有什么建议吗?
查看完整描述

2 回答

?
慕码人2483693

TA贡献1860条经验 获得超9个赞

如果你想使用 RxJS,你应该在catchError与forkJoin.


const { of, from, forkJoin } = rxjs;

const { catchError, tap } = rxjs.operators;


// your promise factory, unchanged (just shorter)

const request = {

  get: url => {

    return new Promise((resolve, reject) => setTimeout(

      () => url === '/urlnotexists' ? reject(new Error(url)) : resolve(url), 1000

    ));

  }

};


// a single rxjs request with error handling

const fetch$ = url => {

  console.log('before:', url);

  return from(request.get(url)).pipe(

    // add any additional operator that should be executed for each request here

    tap(val => console.log('after:', val)),

    catchError(error => {

      console.log('err:', error.message);

      return of(undefined);

    })

  );

};


// concurrently executed rxjs requests

forkJoin(["/urlexists", "/urlnotexists", "/urlexists2", "/urlexists3"].map(fetch$))

  .subscribe(merged => console.log("merged:", merged));

<script src="https://unpkg.com/@reactivex/rxjs@6.5.3/dist/global/rxjs.umd.js"></script>


查看完整回答
反对 回复 2021-11-12
?
至尊宝的传说

TA贡献1789条经验 获得超10个赞

如果你愿意放弃 RXJS 而只是用 async/await 解决它是非常简单的:


const urls = ['/urlexists', '/urlnotexists', '/urlexists2', '/urlexists3']

const promises = urls.map(url => request(url)

const resolved = await Promise.allSettled(promises)


// print out errors

resolved.forEach((r, i) => {

  if (r.status === "rejected') {

    console.log(`${urls[i]} failed: ${r.reason})

  }

})


// get the success results

const merged = resolved.filter(r => r.status === "resolved").map(r => r.value)

console.log('merged', merged)

这利用了Promise.allSettled提议的辅助方法。如果您的环境没有此方法,则可以按照此答案中所示实现它。


查看完整回答
反对 回复 2021-11-12
  • 2 回答
  • 0 关注
  • 109 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信