为了账号安全,请及时绑定邮箱和手机立即绑定

使用 Rx java Observable 一次进行多个异步调用(触发和忘记调用)

使用 Rx java Observable 一次进行多个异步调用(触发和忘记调用)

蓝山帝景 2021-11-11 14:14:16
我有一个需要异步调用的下游 api 调用列表(大约 10 个)。到目前为止,我一直在使用 callablesList<RequestContextPreservingCallable <FutureResponse>> callables我会将 api 调用添加到此列表中,并在最后使用 executeAsyncNoReturnRequestContextPreservingCallables 提交它。使用 Rx java Observables 我该怎么做?List<RequestContextPreservingCallable<FutureResponse>> callables = new ArrayList<RequestContextPreservingCallable<FutureResponse>>();callables.add(apiOneConnector.CallToApiOne(name));callables.add(apiTwoConnector.CallToApiTWO(sessionId));....//execute all the callsexecuteAsyncNoReturnRequestContextPreservingCallables(callables);
查看完整描述

1 回答

?
慕神8447489

TA贡献1780条经验 获得超1个赞

您可以使用zip运算符。该zip运营商可以采取多种观测,并同时执行它们,所有的结果已经抵达后,将继续进行。


然后,您可以将这些结果转换为您需要的形式并传递到下一个级别。


按照你的例子。假设您有多个 API 调用来获取名称和会话等,如下所示


Observable.zip(getNameRequest(), getSessionIdRequest(), new BiFunction<String, String, Object>() {

        @Override

        public Object apply(String name, String sessionId) throws Exception {

            // here you will get all the results once everything is completed. you can then take these 

            // results and transform into another object and returnm from here. I decided to transform the results into an Object[]

            // the retuen type of this apply funtion is generic, so you can choose what to return

            return new Object[]{name, sessionId};

        }

    })

    .subscribeOn(Schedulers.io())  // will start this entire chain in an IO thread

    .observeOn(AndroidSchedulers.mainThread()) // observeOn will filp the thread to the given one , so that the downstream will be executed in the specified thread. here I'm switching to main at this point onwards

    .subscribeWith(new DisposableObserver<Object>() {

        @Override

        public void onNext(Object finalResult) {

           // here you will get the final result with all the api results

        }


        @Override

        public void onError(Throwable e) {

            // any error during the entire process will be triggered here

        }


        @Override

        public void onComplete() {

             //will be called once the whole chain is completed and terminated

        }

    });

您甚至可以将 observables 列表传递给zip如下


    List<Observable<String>> requests = new ArrayList<>();

    requests.add(getNameRequest());

    requests.add(getSessionIdRequest());


    Observable.zip(requests, new Function<Object[], Object[]>() {

        @Override

        public Object[] apply(Object[] objects) throws Exception {

            return new Object[]{objects[0], objects[1]};

        }

    }).subscribeWith(new DisposableObserver<Object[]>() {

        @Override

        public void onNext(Object[] objects) {


        }


        @Override

        public void onError(Throwable e) {


        }


        @Override

        public void onComplete() {


        }

    })          


查看完整回答
反对 回复 2021-11-11
  • 1 回答
  • 0 关注
  • 331 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信