我想创建一个Flux具有按需生成的元素且预取有限的元素。我尝试了以下操作,但看起来这段代码无法处理背压,因为它generateElements变得非常大 (1011):AtomicInteger generateElements = new AtomicInteger(0);Flux<Integer> source = Flux .create(emitter -> { while (true) emitter.next(generateElements.getAndIncrement()); }) .subsribeOn(Schedulers.elastic()) .limitRate(1);source.take(4).subsribe(System.out::println);assertThat(generateElements.get()).isEqualTo(5);我怎样才能使我的Flux预取仅限于一次?
1 回答
森栏
TA贡献1810条经验 获得超5个赞
您可以Flux.generate
在定义单个项目发射的地方使用 which expects a callable:
AtomicInteger generateElements = new AtomicInteger(0); Flux.generate(emitter -> emitter.next(generateElements.getAndIncrement())) .subscribeOn(Schedulers.elastic()) .take(4) .subscribe(System.out::println);
添加回答
举报
0/150
提交
取消