为了账号安全,请及时绑定邮箱和手机立即绑定

将数据库和网络调用与 RxJava2 结合

将数据库和网络调用与 RxJava2 结合

慕哥9229398 2021-08-19 21:31:55
我有 2 个数据源:数据库(缓存)和 api,我需要将它们组合成一个流。我知道我可以简单地使用 concatArray 或类似的东西,但我想实现更复杂的行为:最多可发射 2 个元素的可观察流。它将在开始时订阅这两个来源。如果 api 调用足够快(<~300 毫秒),它将仅从中发出数据并完成流。如果 api 调用会很慢(>~300ms),从数据库发出数据并仍然等待来自 api 的数据如果 api 调用不会成功,则从数据库发出数据并发出错误。如果数据库以某种方式比 api 慢,它就不能发出它的数据(流完成解决了这个问题)我用以下代码完成了它:   public Observable<Entity> getEntity() {    final CompositeDisposable disposables = new CompositeDisposable();    return Observable.<Entity>create(emitter -> {        final Entity[] localEntity = new Entity[1];        //database call:        disposables.add(database.getEntity()                .subscribeOn(schedulers.io())                .doOnSuccess(entity -> localEntity[0] = entity) //saving our entity because                                                         //apiService can emit error before 300 ms                 .delay(300, MILLISECONDS)                .subscribe((entity, throwable) -> {                    if (entity != null && !emitter.isDisposed()) {                        emitter.onNext(entity);                    }                }));        //network call:        disposables.add(apiService.getEntity()                .subscribeOn(schedulers.io())                .onErrorResumeNext(throwable -> {                    return Single.<Entity>error(throwable) //we will delay error here                            .doOnError(throwable1 -> {                                if (localEntity[0] != null) emitter.onNext(localEntity[0]); //api error, emit localEntity                            })                            .delay(200, MILLISECONDS, true); //to let it emit localEntity before emitting error                })                .subscribe(entity -> {                    emitter.onNext(entity);                     emitter.onComplete(); //we got entity from api, so we can complete the stream                }, emitter::onError));    })代码有点笨重,我在 observable 中创建了 observables,我认为这是不好的。但是这样我就可以全局访问发射器,这使我能够以我想要的方式控制主流(发射数据、成功、错误)。有没有更好的方法来实现这一目标?我很想看一些代码示例。谢谢!
查看完整描述

3 回答

?
慕妹3242003

TA贡献1824条经验 获得超6个赞

可能是下面的代码可以完成这项工作。根据您的要求,我假设 api 和数据库处理Single<Entity>.


private static final Object STOP = new Object();


public static void main(String[] args) {

    Database database = new Database(Single.just(new Entity("D1")));

    ApiService apiService = new ApiService(Single.just(new Entity("A1")));

    // ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(500, MILLISECONDS));

    // ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));

    BehaviorSubject<Object> subject = BehaviorSubject.create();


    Observable.merge(

        apiService.getEntity()

                  .toObservable()

                  .doOnNext(t -> subject.onNext(STOP))

                  .doOnError(e -> subject.onNext(STOP))

                  .onErrorResumeNext(t ->

                                        Observable.concatDelayError(database.getEntity().toObservable(),

                                                                    Observable.error(t))),

        database.getEntity()

                .delay(300, MILLISECONDS)

                .toObservable()

                .takeUntil(subject)

    )

    .subscribe(System.out::println, 

               System.err::println);


    Observable.timer(1, MINUTES) // just for blocking the main thread

              .toBlocking()

              .subscribe();

}

我没能取出使用的Subject从因条件“如果数据库在某种程度上会比API更慢,它不能发射其数据”和“如果API调用将是缓慢的(>〜300毫秒),排出数据数据库并仍然等待来自 api 的数据”。否则,amb()运算符将是一个很好的用途。


查看完整回答
反对 回复 2021-08-19
?
MMMHUHU

TA贡献1834条经验 获得超8个赞

另一种解决方案可能是这个(没有主题):


public static void main(String[] args) throws InterruptedException {

    Database database = new Database(Single.just(new Entity("D1")));

    ApiService apiService = new ApiService(Single.just(new Entity("A1")));

    // ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(400, MILLISECONDS));

    // ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));


    database.getEntity()

            .toObservable()

            .groupJoin(apiService.getEntity()

                                 .toObservable()

                                 .onErrorResumeNext(

                                    err -> Observable.concatDelayError(database.getEntity().toObservable(),

                                                                       Observable.error(err))),

                       dbDuration -> Observable.timer(300, MILLISECONDS),

                       apiDuration -> Observable.never(),

                       (db, api) -> api.switchIfEmpty(Observable.just(db)))

            .flatMap(o -> o)

            .subscribe(System.out::println,

                       Throwable::printStackTrace,

                       () -> System.out.println("It's the end!"));


    Observable.timer(1, MINUTES) // just for blocking the main thread

              .toBlocking()

              .subscribe();

}

如果 API 服务在 300 毫秒 ( dbDuration -> timer(300, MILLISECONDS))内没有发出任何内容,则从数据库中发出实体 ( api.switchIfEmpty(db))。


如果 api 在 300 毫秒内发出某些内容,则 仅发出其Entity( api.switchIfEmpty(.))。


这似乎也如您所愿...


查看完整回答
反对 回复 2021-08-19
?
呼啦一阵风

TA贡献1802条经验 获得超6个赞

另一个更好的解决方案:


public static void main(String[] args) throws InterruptedException {

    Database database = new Database(Single.just(new Entity("D1")));

    ApiService apiService = new ApiService(Single.just(new Entity("A1")));

    // ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(400, MILLISECONDS));

    // ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));


    Observable<Entity> apiServiceWithDbAsBackup =

            apiService.getEntity()

                      .toObservable()

                      .onErrorResumeNext(err -> 

                            Observable.concatDelayError(database.getEntity().toObservable(), Observable.error(err)));


    Observable.amb(database.getEntity()

                           .toObservable()

                           .delay(300, MILLISECONDS)

                           .concatWith(apiServiceWithDbAsBackup),

                   apiServiceWithDbAsBackup)

              .subscribe(System.out::println,

                         Throwable::printStackTrace,

                         () -> System.out.println("It's the end!"));

我们使用amb()延迟到数据库 observable 来获取将发出的第一个。如果 api 服务出错,我们会从数据库中发出项目。这似乎也如您所愿...



查看完整回答
反对 回复 2021-08-19
  • 3 回答
  • 0 关注
  • 199 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信