1 回答
TA贡献1853条经验 获得超9个赞
简答
subscribe不会阻塞当前线程,这意味着应用程序主线程可以在 Flux 发出任何元素之前完成。所以要么block在主线程中使用等待,要么使用等待。
细节
调用 no-args subscribe()只是request(unbounded)在Flux不设置任何Subscriber. 它通常在单独的线程中触发操作,但不会阻塞当前线程。最有可能的是,您的主线程在WebClient收到该单独线程中的响应之前就结束了,并且发生了被动副作用doOnNext(...)。
为了说明/测试操作已启动,请在主线程中等待一段时间。只需在subscribe()调用后立即输入以下行:
Thread.sleep(1000);
现在,在使用超时值后,您将能够看到打印的结果。
现在让我们Scheduler为异步操作隐式发送自定义并等待其所有任务完成。另外,让我们传递System.out::printlnassubscribe(...)参数而不是doOnNext,以便完整的代码如下所示:
ExecutorService executor = Executors.newSingleThreadExecutor();
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.publishOn(Schedulers.fromExecutor(executor)) // next operation will go to this executor
.subscribe(System.out::println); //still non-blocking
executor.awaitTermination(1, TimeUnit.SECONDS); //block current main thread
这个例子使用了稍微不同的subscribe(Consumer)。最重要的是,它增加了publishOn(调度),这是由支持ExecutorService。后者用于等待主线程中的终止。
当然,获得相同结果的更简单的方法是使用block()您最初提到的:
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.doOnNext(System.out::println)
.block();
最后,请注意您的第三个示例Flux.just(...)...subscribe()- 似乎它只是在您的主线程终止之前快速完成。那是因为String与发射单个元素相比,它需要更少的时间来发射几个元素GetLocationsResponse(意味着写入请求 + 读取响应 + 解析到 POJO 的时间)。但是,如果您使用它Flux来延迟元素,则会重现相同的行为:
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.delayElements(Duration.ofMillis(500)) //this makes it stop printing in main thread
.doOnNext(System.out::println)
.subscribe();
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.delayElements(Duration.ofMillis(500))
.doOnNext(System.out::println)
.blockLast(); //and that makes it printing back again
添加回答
举报