3 回答
TA贡献1824条经验 获得超6个赞
可能是下面的代码可以完成这项工作。根据您的要求,我假设 api 和数据库处理Single<Entity>.
private static final Object STOP = new Object();
public static void main(String[] args) {
Database database = new Database(Single.just(new Entity("D1")));
ApiService apiService = new ApiService(Single.just(new Entity("A1")));
// ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(500, MILLISECONDS));
// ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));
BehaviorSubject<Object> subject = BehaviorSubject.create();
Observable.merge(
apiService.getEntity()
.toObservable()
.doOnNext(t -> subject.onNext(STOP))
.doOnError(e -> subject.onNext(STOP))
.onErrorResumeNext(t ->
Observable.concatDelayError(database.getEntity().toObservable(),
Observable.error(t))),
database.getEntity()
.delay(300, MILLISECONDS)
.toObservable()
.takeUntil(subject)
)
.subscribe(System.out::println,
System.err::println);
Observable.timer(1, MINUTES) // just for blocking the main thread
.toBlocking()
.subscribe();
}
我没能取出使用的Subject从因条件“如果数据库在某种程度上会比API更慢,它不能发射其数据”和“如果API调用将是缓慢的(>〜300毫秒),排出数据数据库并仍然等待来自 api 的数据”。否则,amb()运算符将是一个很好的用途。
TA贡献1834条经验 获得超8个赞
另一种解决方案可能是这个(没有主题):
public static void main(String[] args) throws InterruptedException {
Database database = new Database(Single.just(new Entity("D1")));
ApiService apiService = new ApiService(Single.just(new Entity("A1")));
// ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(400, MILLISECONDS));
// ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));
database.getEntity()
.toObservable()
.groupJoin(apiService.getEntity()
.toObservable()
.onErrorResumeNext(
err -> Observable.concatDelayError(database.getEntity().toObservable(),
Observable.error(err))),
dbDuration -> Observable.timer(300, MILLISECONDS),
apiDuration -> Observable.never(),
(db, api) -> api.switchIfEmpty(Observable.just(db)))
.flatMap(o -> o)
.subscribe(System.out::println,
Throwable::printStackTrace,
() -> System.out.println("It's the end!"));
Observable.timer(1, MINUTES) // just for blocking the main thread
.toBlocking()
.subscribe();
}
如果 API 服务在 300 毫秒 ( dbDuration -> timer(300, MILLISECONDS))内没有发出任何内容,则从数据库中发出实体 ( api.switchIfEmpty(db))。
如果 api 在 300 毫秒内发出某些内容,则 仅发出其Entity( api.switchIfEmpty(.))。
这似乎也如您所愿...
TA贡献1802条经验 获得超6个赞
另一个更好的解决方案:
public static void main(String[] args) throws InterruptedException {
Database database = new Database(Single.just(new Entity("D1")));
ApiService apiService = new ApiService(Single.just(new Entity("A1")));
// ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(400, MILLISECONDS));
// ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));
Observable<Entity> apiServiceWithDbAsBackup =
apiService.getEntity()
.toObservable()
.onErrorResumeNext(err ->
Observable.concatDelayError(database.getEntity().toObservable(), Observable.error(err)));
Observable.amb(database.getEntity()
.toObservable()
.delay(300, MILLISECONDS)
.concatWith(apiServiceWithDbAsBackup),
apiServiceWithDbAsBackup)
.subscribe(System.out::println,
Throwable::printStackTrace,
() -> System.out.println("It's the end!"));
我们使用amb()延迟到数据库 observable 来获取将发出的第一个。如果 api 服务出错,我们会从数据库中发出项目。这似乎也如您所愿...
添加回答
举报