为了账号安全,请及时绑定邮箱和手机立即绑定

为什么 Completable 和 Observable 之间的订阅时间副作用安排不同?

为什么 Completable 和 Observable 之间的订阅时间副作用安排不同?

慕盖茨4494581 2021-08-04 09:54:07
我目前正在使用 Rx 1。我有以下测试用例:static void printThread(String format, Object... objects) {    System.out.println(String.format("%s %s", Thread.currentThread().getName(),            String.format(format, objects)));}public void testFoo() throws InterruptedException {    Observable.fromCallable(() -> { printThread("callable"); return 1L;})              .subscribeOn(Schedulers.newThread())              .doOnSubscribe(() -> printThread("A"))              .doOnSubscribe(() -> printThread("B"))              .subscribeOn(Schedulers.newThread())              .doOnSubscribe(() -> printThread("C"))              .subscribeOn(Schedulers.newThread())              .doOnSubscribe(() -> printThread("D"))              .toBlocking()              .subscribe();    printThread("next!");    Completable.fromCallable(() -> { printThread("callable"); Thread.sleep(10_000); return 1L;})              .subscribeOn(Schedulers.newThread())              .doOnSubscribe(a -> printThread("A"))              .doOnSubscribe(a -> printThread("B"))              .subscribeOn(Schedulers.newThread())              .doOnSubscribe(a -> printThread("C"))              .subscribeOn(Schedulers.newThread())              .doOnSubscribe(a -> printThread("D"))               .andThen(Completable.fromAction(() -> printThread("E")))               .andThen(Completable.fromAction(() -> printThread("F")).subscribeOn(Schedulers.newThread()))               .await();}产生以下输出:main DRxNewThreadScheduler-1 CRxNewThreadScheduler-2 BRxNewThreadScheduler-2 ARxNewThreadScheduler-3 callablemain next!RxNewThreadScheduler-6 ARxNewThreadScheduler-6 BRxNewThreadScheduler-6 CRxNewThreadScheduler-6 DRxNewThreadScheduler-6 callableRxNewThreadScheduler-6 ERxNewThreadScheduler-7 FProcess finished with exit code 0为什么订阅时间副作用的调度在Observable和之间的工作方式存在差异Completable?我认为发生的事情是对于可观察的行为是产生的,因为订阅发生在默认的单线程调度模式下,除了 wheresubscribeOn()被调用,这就是为什么 A 和 B 发生在同一个线程上,而其他一切都发生在不同的线程上。但我不明白为什么这种行为会被 Completable 改变。
查看完整描述

1 回答

?
杨魅力

TA贡献1811条经验 获得超6个赞

RxJava 1 在这方面有点不一致。1.x在订阅时线程切换之后Completable.subscribeOn调用onSubscribe,而 withObservable则doOnSubscribe在线程切换到上游之前调用。


使用 RxJava 2,它们现在是一致的:


main D

RxNewThreadScheduler-1 C

RxNewThreadScheduler-2 A

RxNewThreadScheduler-2 B

RxNewThreadScheduler-3 callable

main next!

main D

RxNewThreadScheduler-4 C

RxNewThreadScheduler-5 A

RxNewThreadScheduler-5 B

RxNewThreadScheduler-6 callable


查看完整回答
反对 回复 2021-08-04
  • 1 回答
  • 0 关注
  • 208 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信