为了账号安全,请及时绑定邮箱和手机立即绑定

如何等待 JavaRx2 Flowable 完成所有任务?

如何等待 JavaRx2 Flowable 完成所有任务?

catspeake 2023-08-04 14:51:14
我正在尝试学习 RxJava2 库的基础知识,现在我陷入了以下时刻:我已经生成了myFlowablevia Flowable.generate(...),现在我需要等待所有任务完成执行,然后才能继续下一步。这是展示问题的代码:myFlowable.parallel()            .runOn(Schedulers.computation())            .map(val -> myCollection.add(val))            .sequential()            .subscribe(val -> {                System.out.println("Thread from subscribe: " + Thread.currentThread().getName());                System.out.println("Value from subscribe: " + val.toString());            });    System.out.println("Before sleep - Number of objects: " + myCollection.size());    try {        Thread.sleep(1000);        System.out.println("After sleep - Number of objects: " + myCollection.size());    } catch (InterruptedException e) {        e.printStackTrace();    }我运行所有任务并将结果添加到集合中。如果我在 myFlowable 块之后立即检查集合大小,那么如果我在small之后检查它,情况将会有所不同Thread.sleep()。有什么方法可以检查所有任务是否已完成执行并且我们可以进一步进行?任何帮助或指导将不胜感激。
查看完整描述

3 回答

?
拉风的咖菲猫

TA贡献1995条经验 获得超2个赞

使用Flowable::blockingSubscribe()- 将当前 Flowable 运行到终端事件,忽略任何值并重新抛出任何异常。

查看完整回答
反对 回复 2023-08-04
?
守着星空守着你

TA贡献1799条经验 获得超8个赞

由于 RxJava 是异步的,observable 下面的 java 代码将运行,而 observable 将在不同的线程中运行,这就是为什么如果您想在 Flowable 已完成发送数据时收到通知,您应该在 RxJava 流中执行此操作。为此,您有一个运算符 .doOnComplete 这里有一个示例如何检测流何时完成


        Flowable.range(0, 100).parallel()

            .runOn(Schedulers.computation())

            .map(integer -> {


                return integer;

            })

            .sequential()

            .doOnComplete(() -> {

                System.out.println("finished");

            })

            .subscribe(integer -> System.out.println(integer));


查看完整回答
反对 回复 2023-08-04
?
小唯快跑啊

TA贡献1863条经验 获得超2个赞

您可以使用 AtomicBoolean,将其初始化为 false 并使用 将其设置为 true doFinally()。


doFinally()在 Observable 发出 onError 或 onCompleted 信号后调用,或者被下游处理。


然后让主线程休眠,直到completedvalue 为 true。


使用你的例子:


AtomicBoolean completed = new AtomicBoolean(false);


myFlowable.parallel()

            .runOn(Schedulers.computation())

            .map(val -> myCollection.add(val))

            .sequential()

            .doFinally(() -> completed.set(true))

            .subscribe(val -> {

                ...

            });

    ...

try {

   while(!completed.get()){

       Thread.sleep(1000);

       ...

   }

  ...

} catch (InterruptedException e) {

  e.printStackTrace();

}


查看完整回答
反对 回复 2023-08-04
  • 3 回答
  • 0 关注
  • 147 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信