1 回答
TA贡献1876条经验 获得超7个赞
第一个版本不可靠。我又黑了一个:
static <T> Flux<T> expandOnLastItem(Supplier<Flux<T>> seed, Function<T, Flux<T>> generator) {
return Flux.just(new AtomicReference<T>())
.flatMap(last -> Flux.just(seed.get().materialize())
.flatMap(Function.identity())
.expand(v -> {
if (v.hasValue()) {
last.set(v.get());
} else if (v.isOnComplete() && last.get() != null) {
Flux<T> res = generator.apply(last.get());
last.set(null);
return res.materialize();
}
return Flux.empty();
})
.filter(s -> !s.isOnComplete())
.dematerialize());
}
可以用作
static Flux<Integer> getPage(int pageId, int size) {
return Flux.defer(() -> {
if (pageId < 3) {
System.out.println("Returning data for pageId: " + pageId);
return Flux.range(pageId * 100, size);
} else {
System.out.println("Returning empty for pageId: " + pageId);
return Flux.empty();
}
});
}
expandOnLastItem(
() -> getPage(0, 5),
lastId -> {
System.out.println(" Expanding. Last item: " + lastId);
int curPage = lastId / 100;
return getPage(curPage + 1, 5);
})
.reduce(0L, (count, value) -> {
System.out.println("==> " + value);
return count + 1;
})
.block();
所以我通过改变生成器中的状态变量来破解它。它可以工作,但功能不是很好。如果其他人可以提出替代方案,我将不胜感激。
Random rand = new Random(1024);
Flux.<Flux<String>, State>generate(State::new, (state, sink) -> {
if (state.iteration < 4) {
final int count = rand.nextInt(10) + 1;
System.out.println(String.format("*** Generate %d: start %d (count %d)", state.iteration, state.secret, count));
Flux<Integer> inner = Flux.range(state.secret, count);
final int[] last = {Integer.MIN_VALUE};
sink.next(
inner
.doOnNext(value -> {
last[0] = value;
})
.map(value -> String.format("Iter %d value %d", state.iteration, value))
.doOnComplete(() -> {
System.out.println(String.format("Inner complete (last item was %d)", last[0]));
state.secret = last[0];
state.iteration += 1;
}));
} else {
System.out.println("Generate complete");
sink.complete();
}
return state;
})
.flatMap(Function.identity())
.map(value -> {
System.out.println(String.format("Ext map: %s", value));
return value;
})
.buffer(5)
.flatMapIterable(Function.identity())
.subscribe(value -> System.out.println(String.format(" ---> %s", value)));
System.out.println("Exiting");
添加回答
举报