为了账号安全,请及时绑定邮箱和手机立即绑定

将 Single 应用于 ObservableSource 而不是过度阅读

将 Single 应用于 ObservableSource 而不是过度阅读

波斯汪 2023-02-23 10:53:22
总的来说,我对 RX 很陌生,尤其是 rxjava,请原谅我的错误。此操作依赖于两个异步操作。第一个使用过滤器函数尝试从异步 Observable 返回的列表中获取单个实体。第二个是与设备通信并生成状态更新的 Observable 的异步操作。我想获取从过滤器函数创建的 Single,将其应用于pairReader(...),并订阅其 Observable 以获取更新。我可以让它如图所示工作,但前提是我包含注释take(1),否则我会得到一个异常,因为链试图从 Single 中提取另一个值。  Observable<DeviceCredential> getCredentials() {    return deviceCredentialService()            .getCredentials()            .flatMapIterable(event -> event.getData());  }  Single<Organization> getOrgFromCreds(String orgid) {    return getCredentials()      // A device is logically constrained to only have a single cred per org      .map(DeviceCredential::getOrganization)      .filter(org -> org.getId().equals(orgid))      .take(1)  // Without this I get an exception      .singleOrError();  }  Function<Organization, Observable<Reader.EnrollmentState>> pairReader(String name) {    return org -> readerService().pair(name, org);  }getOrgFromCreds(orgid)  .flatMapObservable(pairReader(readerid))  .subscribe(state -> {     switch(state) {       case BEGUN:         LOG.d(TAG, "Pairing begun");         break;       case PAIRED:         LOG.d(TAG, "Pairing success");         callback.success();         break;       case NOTIFIED_SERVER:         LOG.d(TAG, "Pairing server notified");         break;     }},     error -> {       Crashlytics.logException(error);       callback.error(error.getLocalizedMessage());     });
查看完整描述

3 回答

?
噜噜哒

TA贡献1784条经验 获得超7个赞

如果我没看错,您需要使用之前检索到的异步数据执行一些操作。因此,您可以使用.zip()运算符。这是一个例子:

Observable.zip(
                getOrgFromCreds().toObservable(),
                getCredentials(),
                (first, second) -> /*create output object here*/)
                .subscribe(
                        (n) -> /*do onNext*/,
                        (e) -> /*do onError*/
                );

请注意,该.zip()运算符将等待两个流的发射,然后它将使用您在“此处创建输出对象”中提供的函数创建外部发射。如果您不想等待这两个项目 - 您可以使用.combineLatest()


查看完整回答
反对 回复 2023-02-23
?
SMILET

TA贡献1796条经验 获得超4个赞

如果源流发出不止一项,singleOrError()则应该发出错误。文档

对于您的情况,请使用first()firstOrError()代替。

  Single<Organization> getOrgFromCreds(String orgid) { 
     return getCredentials()
      .map(DeviceCredential::getOrganization)
      .filter(org -> org.getId().equals(orgid))
      .firstOrError();
  }


查看完整回答
反对 回复 2023-02-23
?
回首忆惘然

TA贡献1847条经验 获得超11个赞

这里的问题原来是 API 的设计方式很奇怪(不幸的是,文档非常糟糕)。我不明白为什么我得到重复项,并认为我使用flatMapIterable不正确。


该deviceCredentialService.getCredentials()调用实际创建的是一个可观察对象,它发出DataEvent对象,这些对象是对结果列表的简单包装,并带有结果来源的标志。


API 设计者希望允许用户使用本地缓存的数据立即填充 UI,同时执行对 REST API 的较长请求。该DataEvent.from属性是一个枚举,用于标记来自本地设备缓存或来自远程 API 调用的来源。


我解决这个问题的方法是简单地忽略来自本地缓存的结果,只从 API 发出结果:


  Observable<DeviceCredential> getCredentials() {

    return deviceCredentialService()

      .getCredentials()

      // Only get creds from network

      .filter(e -> e.getFrom() == SyncedDataSourceObservableFactory.From.SOURCE)

      .flatMapIterable(e -> e.getData());

  }


  Single<Organization> getOrgFromCreds(String orgid) {

    return getCredentials()

      // A device is logically constrained to only have a single cred per org

      .map(DeviceCredential::getOrganization)

      .filter(org -> org.getId().equals(orgid))

      .singleOrError();

  }

然后计划是使用记忆化缓存实体,使实施应用程序能够访问缓存失效。由于提供的接口不允许抑制 API 调用,因此如果应用程序感觉它是新鲜的,则无法仅使用缓存。


查看完整回答
反对 回复 2023-02-23
  • 3 回答
  • 0 关注
  • 121 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信