为了账号安全,请及时绑定邮箱和手机立即绑定

我可以为 subscribeOn 方法和异步任务使用相同的执行程序吗

我可以为 subscribeOn 方法和异步任务使用相同的执行程序吗

慕斯709654 2021-08-13 15:53:59
我有一个简单的问题,假设我有一个如下所示的课程:import lombok.Value;import java.nio.file.Path;@Valueclass ImageResizeRequest {    private DownloadedImage downloadedImage;    private ImageSize imageSize;    private Path destinationLocation;}上面的类代表负责将图像调整为给定大小的单个任务。我有很多要求将此图像调整为许多不同的尺寸。@RequiredArgsConstructorclass ImageResizeService {    private final Executor executor;    Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {        return Flux.fromIterable(requests)                .flatMap(this::resize)                .collectList()                .subscribeOn(Schedulers.fromExecutor(executor));    }    private Mono<ImageResizeResult> resize(ImageResizeRequest request) {        return Mono.fromFuture(CompletableFuture.supplyAsync(resizeTask(request), executor));    }    private Supplier<ImageResizeResult> resizeTask(ImageResizeRequest request) {        return () -> {            //TODO add image resize logic for example ImageMagick by Im4Java...            /** code below call ImageMagick library             ConvertCmd cmd = new ConvertCmd();             IMOperation op = new IMOperation();             op.quality(100d);             op.addImage(request.getDestinationLocation().toString());             cmd.run(op);             */            //TODO add logic!!!            return new ImageResizeResult(null, null, null, null);        };    }}我的问题是:如何在 Project Reactor 中实现负责调整图像大小的并行独立任务?如果没有项目反应器,我将使用 CompletableFuture 列表:private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {    CompletableFuture<Void> allDoneFuture =        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));    return allDoneFuture.thenApply(v ->            futures.stream().                    map(future -> future.join()).                    collect(Collectors.<T>toList())    );}具有指定的执行程序服务。此外,在我的示例中,我在 subscribeOn 方法和 supplyAsync 中使用相同的执行程序 - 是个好主意吗?
查看完整描述

2 回答

?
FFIVE

TA贡献1797条经验 获得超6个赞

不要不断地Scheduler从重新创建 ,ExecutorService而是努力将它直接包装在构造函数中。


您根本不需要CompletableFuture,并且subscribeOn应该应用于内部flatMap以可能为每个调整大小任务选择单独的线程(它从每个 Flux 应用到的池中选择一个线程):


class ImageResizeService {


  private final Executor executor; //TODO prefer an ExecutorService if possible

  private final Scheduler scheduler; //FIXME Schedulers.fromExecutor(executor)


  Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {

    //we get the requests on IO thread

    return Flux.fromIterable(requests)

            //for each request, perform asynchronous resize...

            .flatMap(r -> Mono

                //... by converting the resizeTask Callable to a Mono

                .fromCallable(r -> resizeTask(r).get())

                //... and making sure it executes on the executor

                .subscribeOn(scheduler)

            )

            .collectList();

  }

}

为了实现真正的并行化,您还有另一种选择parallel().runOn()::


Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {

    //we get the requests on IO thread

    return Flux.fromIterable(requests)

            //divide into N workloads

            //the executor _should_ be capable of this degree of parallelisation:

            .parallel(NUMBER_OF_DESIRED_THREADS)

            //actually tell to run each workload on a thread picked from executor

            .runOn(scheduler) 

            //here the workload are already running on their dedicated thread,

            //we can afford to block it and thus apply resize in a simpler `map`

            .map(r -> resizeTask(r).get()) //NB: the Supplier aspect can probably be removed

            //go back to a `Flux` sequence for collection into list

            .sequential()

            .collectList();

}


查看完整回答
反对 回复 2021-08-13
  • 2 回答
  • 0 关注
  • 225 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信