为了账号安全,请及时绑定邮箱和手机立即绑定

Flux 未订阅 Spring 5 反应堆

Flux 未订阅 Spring 5 反应堆

汪汪一只猫 2021-09-26 18:51:30
我可能遗漏了一些东西,但我无法弄清楚它是什么。下面的代码什么都不做:webClient.get().uri("/some/path/here").retrieve()     .bodyToMono(GetLocationsResponse.class)     .doOnNext(System.out::println)     .subscribe();如果我尝试阻止呼叫它工作正常:webClient.get().uri("/some/path/here").retrieve()      .bodyToMono(GetLocationsResponse.class)      .doOnNext(System.out::println)      .block();奇怪的是,如果我“手动”创建 Flux(即不是来自 spring webClient),这可以正常工作:Flux.just("1", "2", "3")    .filter(s -> !s.equals("2"))    .doOnNext(System.out::println)    .subscribe();有人可以解释一下我做错了什么吗?是不是.subscribe()应该在第一种情况下执行操作,就像在最后一种情况下一样?
查看完整描述

1 回答

?
暮色呼如

TA贡献1853条经验 获得超9个赞

简答


subscribe不会阻塞当前线程,这意味着应用程序主线程可以在 Flux 发出任何元素之前完成。所以要么block在主线程中使用等待,要么使用等待。


细节


调用 no-args subscribe()只是request(unbounded)在Flux不设置任何Subscriber. 它通常在单独的线程中触发操作,但不会阻塞当前线程。最有可能的是,您的主线程在WebClient收到该单独线程中的响应之前就结束了,并且发生了被动副作用doOnNext(...)。


为了说明/测试操作已启动,请在主线程中等待一段时间。只需在subscribe()调用后立即输入以下行:


Thread.sleep(1000);

现在,在使用超时值后,您将能够看到打印的结果。


现在让我们Scheduler为异步操作隐式发送自定义并等待其所有任务完成。另外,让我们传递System.out::printlnassubscribe(...)参数而不是doOnNext,以便完整的代码如下所示:


ExecutorService executor = Executors.newSingleThreadExecutor(); 


webClient.get().uri("/some/path/here").retrieve()

    .bodyToMono(GetLocationsResponse.class)

    .publishOn(Schedulers.fromExecutor(executor)) // next operation will go to this executor

    .subscribe(System.out::println); //still non-blocking


executor.awaitTermination(1, TimeUnit.SECONDS); //block current main thread 

这个例子使用了稍微不同的subscribe(Consumer)。最重要的是,它增加了publishOn(调度),这是由支持ExecutorService。后者用于等待主线程中的终止。


当然,获得相同结果的更简单的方法是使用block()您最初提到的:


webClient.get().uri("/some/path/here").retrieve()

      .bodyToMono(GetLocationsResponse.class)

      .doOnNext(System.out::println)

      .block();

最后,请注意您的第三个示例Flux.just(...)...subscribe()- 似乎它只是在您的主线程终止之前快速完成。那是因为String与发射单个元素相比,它需要更少的时间来发射几个元素GetLocationsResponse(意味着写入请求 + 读取响应 + 解析到 POJO 的时间)。但是,如果您使用它Flux来延迟元素,则会重现相同的行为:


Flux.just("1", "2", "3")

    .filter(s -> !s.equals("2"))

    .delayElements(Duration.ofMillis(500)) //this makes it stop printing in main thread

    .doOnNext(System.out::println)

    .subscribe(); 



Flux.just("1", "2", "3")

    .filter(s -> !s.equals("2"))

    .delayElements(Duration.ofMillis(500))

    .doOnNext(System.out::println)

    .blockLast(); //and that makes it printing back again


查看完整回答
反对 回复 2021-09-26
  • 1 回答
  • 0 关注
  • 157 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信