2 回答
TA贡献1797条经验 获得超6个赞
不要不断地Scheduler从重新创建 ,ExecutorService而是努力将它直接包装在构造函数中。
您根本不需要CompletableFuture,并且subscribeOn应该应用于内部flatMap以可能为每个调整大小任务选择单独的线程(它从每个 Flux 应用到的池中选择一个线程):
class ImageResizeService {
private final Executor executor; //TODO prefer an ExecutorService if possible
private final Scheduler scheduler; //FIXME Schedulers.fromExecutor(executor)
Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {
//we get the requests on IO thread
return Flux.fromIterable(requests)
//for each request, perform asynchronous resize...
.flatMap(r -> Mono
//... by converting the resizeTask Callable to a Mono
.fromCallable(r -> resizeTask(r).get())
//... and making sure it executes on the executor
.subscribeOn(scheduler)
)
.collectList();
}
}
为了实现真正的并行化,您还有另一种选择parallel().runOn()::
Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {
//we get the requests on IO thread
return Flux.fromIterable(requests)
//divide into N workloads
//the executor _should_ be capable of this degree of parallelisation:
.parallel(NUMBER_OF_DESIRED_THREADS)
//actually tell to run each workload on a thread picked from executor
.runOn(scheduler)
//here the workload are already running on their dedicated thread,
//we can afford to block it and thus apply resize in a simpler `map`
.map(r -> resizeTask(r).get()) //NB: the Supplier aspect can probably be removed
//go back to a `Flux` sequence for collection into list
.sequential()
.collectList();
}
添加回答
举报