1 回答
TA贡献1921条经验 获得超9个赞
好的,我找到了。没有预取。事实上,它是Flux.defer根据订阅加载下一页的,而不是根据请求加载的。
解决这个问题的快速(但肮脏的)测试是:
Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {
return fct.apply(token).flatMapMany(page -> {
if (page.token == null) {
// no more pages
return Flux.fromIterable(page.entities);
}
return Flux
.fromIterable(page.entities)
.concatWith(
// Flux.defer(() -> load(page.token, fct))
Flux.create(s -> {
DelegateSubscriber[] ref = new DelegateSubscriber[1];
s.onRequest(l -> {
if (ref[0] == null) {
ref[0] = new DelegateSubscriber(s);
load(page.token, fct).subscribe(ref[0]);
}
ref[0].request(l);
});
}));
});
}
static class DelegateSubscriber extends BaseSubscriber<Object> {
FluxSink<Object> delegate;
public DelegateSubscriber(final FluxSink<Object> delegate) {
this.delegate = delegate;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
// nothing
}
@Override
protected void hookOnNext(Object value) {
delegate.next(value);
}
@Override
protected void hookOnError(Throwable throwable) {
delegate.error(throwable);
}
}
添加回答
举报