为了账号安全,请及时绑定邮箱和手机立即绑定

从 while 循环创建 Flowable

从 while 循环创建 Flowable

蓝山帝景 2022-12-15 16:11:49
我是 RxJava 的新手,我需要创建包含多个数据源的存储库。这对我来说很复杂,因为有几个较小的子任务我不知道如何用 RxJava 实现。先是自己写了dao,处理InputStream,提供指定范围内的Item。目前它只是在列表中收集数据,但我想使用 flowable 一个一个地提供项目;目前它提供Maybe<List<Item>>。还有一些错误需要传输到更高级别(数据源)。比如EndOfFile,通知DataSource数据缓存完毕;Dao.class:List<Item> loadRange(int start, int number) throws ... {    ...    while(...) {        ...        //TODO contribute item to flowable        resultList.add(new Item(...))     }    return resultList;}Maybe<List<Item>>刚刚创建的方法Maybe.fromCallable();
查看完整描述

1 回答

?
千巷猫影

TA贡献1829条经验 获得超7个赞

这样的事情应该适用于此:


Flowable<Item> loadRange(int start, int number) {

        return Flowable.create(emitter -> {

            try {

                while (...){

                    emitter.onNext(new Item());

                }

                emitter.onComplete();

            } catch (IOException e) {

                emitter.onError(e);

            }

        }, BackpressureStrategy.BUFFER);

    }

我假设一旦循环完成你想要完成,也向下游发送错误,而不是处理方法签名。您也可以更改BackPressureStrategy以适合您的用例,即DROP,LATEST等等。


由于您是 RxJava 的新手,匿名类将是:


Flowable<Item> loadRange(int start, int number) {

        return Flowable.create(new FlowableOnSubscribe<Item>() {

            @Override public void subscribe(FlowableEmitter<Item> emitter) {

                try {

                    while (...){

                        emitter.onNext(new Item());

                    }

                    emitter.onComplete();

                } catch (IOException e) {

                    emitter.onError(e);

                }

            }

        }, BackpressureStrategy.BUFFER);

    }


查看完整回答
反对 回复 2022-12-15
  • 1 回答
  • 0 关注
  • 66 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信