为了账号安全,请及时绑定邮箱和手机立即绑定

仅在需要时才在 Reactor 的 Flux 中请求下一个

仅在需要时才在 Reactor 的 Flux 中请求下一个

肥皂起泡泡 2023-08-04 19:19:47
我有一个 API,它返回实体列表,实体数量上限为 100 个。如果有更多实体,它将返回下一页的令牌。我想创建一个通量,它返回所有实体(所有页面),但仅在需要时(如果有请求)返回。我写了这段代码:class Page {    String token;    List<Object> entities;}Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {    return fct.apply(token).flatMapMany(page -> {        if (page.token == null) {            // no more pages            return Flux.fromIterable(page.entities);        }        return Flux.fromIterable(page.entities).concatWith(Flux.defer(() -> load(page.token, fct)));    });}它有效 - 几乎如果我请求 99 个元素,则加载第一页,并且我的通量包含 99 个元素。如果我请求 150 个元素,则会加载第一页和第二页,并且我的 Flux 包含 150 个元素。但是,如果我请求 100 个元素,则会加载第一页和第二页(并且我的 Flux 包含 100 个元素)。我的问题是,第二页已加载,但我没有请求第 101 个元素。当前行为:subscribe()=> Function is called to load page 1request(10)=> Received: 0-9request(89)=> Received: 10-98request(1)=> Received: 99=> Function is called to load page 2request(1)=> Received: 100预期是:页面 2 的加载发生在最后一个请求之后(1)几乎就像在某个地方进行了预取,但我看不到在哪里。有任何想法吗?
查看完整描述

1 回答

?
郎朗坤

TA贡献1921条经验 获得超9个赞

好的,我找到了。没有预取。事实上,它是Flux.defer根据订阅加载下一页的,而不是根据请求加载的。


解决这个问题的快速(但肮脏的)测试是:


Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {

    return fct.apply(token).flatMapMany(page -> {

        if (page.token == null) {

            // no more pages

            return Flux.fromIterable(page.entities);

        }


        return Flux

                .fromIterable(page.entities)

                .concatWith(

                        // Flux.defer(() -> load(page.token, fct))

                        Flux.create(s -> {

                            DelegateSubscriber[] ref = new DelegateSubscriber[1];


                            s.onRequest(l -> {

                                if (ref[0] == null) {

                                    ref[0] = new DelegateSubscriber(s);

                                    load(page.token, fct).subscribe(ref[0]);

                                }

                                ref[0].request(l);

                            });

                        }));

    });

}


static class DelegateSubscriber extends BaseSubscriber<Object> {


    FluxSink<Object> delegate;


    public DelegateSubscriber(final FluxSink<Object> delegate) {

        this.delegate = delegate;

    }


    @Override

    protected void hookOnSubscribe(Subscription subscription) {

        // nothing

    }


    @Override

    protected void hookOnNext(Object value) {

        delegate.next(value);

    }


    @Override

    protected void hookOnError(Throwable throwable) {

        delegate.error(throwable);

    }

}


查看完整回答
反对 回复 2023-08-04
  • 1 回答
  • 0 关注
  • 176 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信