1 回答
TA贡献1853条经验 获得超9个赞
本例中的对象Flux不是ParallelFlux对象,因此它们只会使用一个线程。
如果您创建一个能够处理数千个线程的调度程序,并将其传递给其中一个对象,这并不重要Flux——它们只会坐在那里不被使用,这正是本示例中发生的情况。没有背压,也不会导致异常——它会尽可能快地使用一个线程。
如果要确保Flux充分利用可用的 1024 个线程,则需要调用.parallel(1024):
ParallelFlux<String> flux1 = Flux.merge(source).parallel(1).concatMap(item -> Mono.fromCallable(() -> {
Thread.sleep(1);
return "1_" + counter1.incrementAndGet();
}).subscribeOn(Schedulers.fromExecutor(executor1)));
ParallelFlux<String> flux2 = Flux.merge(source).parallel(1024).concatMap(item -> Mono.fromCallable(() -> {
Thread.sleep(100);
return "2_" + counter2.incrementAndGet();
}).subscribeOn(Schedulers.fromExecutor(executor1)));
如果你对你的代码这样做,那么你会开始看到更接近你似乎期望的结果,尽管它的休眠时间是原来的 100 倍,但还是会过去2_:1_
...
2_17075
2_17076
1_863
1_864
2_17077
1_865
2_17078
2_17079
...
但是,请注意:
我想在发布者之间取得平衡(相对于线程池大小和处理时间)
你不能在这里选择数字来平衡输出,至少不能可靠地或以任何有意义的方式——线程调度将是完全任意的。如果你想这样做,那么你可以使用subscribe 方法的这个变体,允许你显式地调用request()订阅消费者。这样一来,您就可以通过仅请求您准备处理的尽可能多的元素来提供背压。
添加回答
举报