为了账号安全,请及时绑定邮箱和手机立即绑定

Reactor - 理解 .flatMap() 中的线程池

Reactor - 理解 .flatMap() 中的线程池

函数式编程 2023-03-31 15:38:28
我试图了解反应式编程是如何工作的。我为此准备了简单的演示:WebClient来自 Spring Framework 的 reactive 将请求发送到简单的 rest api,并且此客户端在每个操作中打印线程名称。休息API:@RestController@SpringBootApplicationpublic class RestApiApplication {    public static void main(String[] args) {        SpringApplication.run(RestApiApplication.class, args);    }    @PostMapping("/resource")    public void consumeResource(@RequestBody Resource resource) {        System.out.println(String.format("consumed resource: %s", resource.toString()));    }}@Data@AllArgsConstructorclass Resource {    private final Long id;    private final String name;}问题是行为与我预期的不同。.map()我预计,.filter()和 的每次调用都.flatMap()将在线程上执行,而ormain的每次调用都将在 nio 线程池中的线程上执行。所以我希望日志看起来像:.doOnSuccess().doOnError------- map [main] --------------- filter [main] --------------- flatmap [main] --------(and so on...)------- onsuccess [reactor-http-nio-2] --------(and so on...)但我得到的日志是:------- map [main] --------------- filter [main] --------------- flatmap [main] --------------- map [main] --------------- filter [main] --------------- flatmap [main] --------------- onsuccess [reactor-http-nio-2] --------------- onsuccess [reactor-http-nio-6] --------------- onsuccess [reactor-http-nio-4] --------------- onsuccess [reactor-http-nio-8] --------------- map [reactor-http-nio-2] --------------- filter [reactor-http-nio-2] --------------- flatmap [reactor-http-nio-2] --------------- map [reactor-http-nio-2] --------每次下一次登录.map(),.filter()都是.flatMap()在 reactor-http-nio 的线程上完成的。下一个难以理解的事实是在主线程和 reactor-http-nio 上执行的操作之间的比率总是不同的。有时所有操作.map(),.filter()和.flatMap()都在主线程上执行。
查看完整描述

1 回答

?
慕桂英546537

TA贡献1848条经验 获得超10个赞

Reactor 和 RxJava 一样,可以被认为是并发不可知的。也就是说,它不强制执行并发模型。相反,它让您(开发人员)掌握一切。但是,这并不妨碍该库帮助您处理并发。

获得aFlux或aMono并不一定意味着它运行在专用的Thread中。相反,大多数运算符继续在前一个运算符执行的线程中工作subscribe()除非指定,否则最顶层的运算符(源)本身在进行调用的线程上运行。

从您的代码中,以下代码段:

webClient.post()
         .uri("/resource")
         .syncBody(res)
         .header("Content-Type", "application/json")
         .header("Accept", "application/json")
         .retrieve()
         .bodyToMono(Resource.class)

导致线程从main切换到netty 的工作池。之后,以下所有操作均由 netty 工作线程执行。

如果你想控制这种行为,你应该publishOn(...)在你的代码中添加一条语句,例如:

webClient.post()
         .uri("/resource")
         .syncBody(res)
         .header("Content-Type", "application/json")
         .header("Accept", "application/json")
         .retrieve()
         .bodyToMono(Resource.class)
         .publishOn(Schedulers.elastic())

这样,弹性调度程序线程池将执行任何后续操作。

另一个例子是使用专用调度程序处理 HTTP 请求执行后的繁重任务。

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;

import static com.github.tomakehurst.wiremock.client.WireMock.get;

import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;


import com.github.tomakehurst.wiremock.WireMockServer;

import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;

import org.junit.jupiter.api.extension.ExtendWith;

import org.springframework.web.reactive.function.client.ClientResponse;

import org.springframework.web.reactive.function.client.WebClient;

import reactor.core.publisher.Flux;

import reactor.core.publisher.Mono;

import reactor.core.scheduler.Schedulers;

import ru.lanwen.wiremock.ext.WiremockResolver;

import ru.lanwen.wiremock.ext.WiremockResolver.Wiremock;

import ru.lanwen.wiremock.ext.WiremockUriResolver;

import ru.lanwen.wiremock.ext.WiremockUriResolver.WiremockUri;


@ExtendWith({

  WiremockResolver.class,

  WiremockUriResolver.class

})

public class ReactiveThreadsControlTest {


  private static int concurrency = 1;


  private final WebClient webClient = WebClient.create();


  @Test

  public void slowServerResponsesTest(@Wiremock WireMockServer server, @WiremockUri String uri) {


    String requestUri = "/slow-response";


    server.stubFor(get(urlEqualTo(requestUri))

      .willReturn(aResponse().withStatus(200)

        .withFixedDelay((int) TimeUnit.SECONDS.toMillis(2)))

    );


    Flux

      .generate(() -> Integer.valueOf(1), (i, sink) -> {

        System.out.println(String.format("[%s] Emitting next value: %d", Thread.currentThread().getName(), i));

        sink.next(i);

        return i + 1;

      })

      .subscribeOn(Schedulers.single())

      .flatMap(i ->

          executeGet(uri + requestUri)

            .publishOn(Schedulers.elastic())

            .map(response -> {

              heavyTask();

              return true;

            })

        , concurrency)

      .subscribe();


    blockForever();

  }


  private void blockForever() {

    Object monitor = new Object();


    synchronized (monitor) {

      try {

        monitor.wait();

      } catch (InterruptedException ex) {

      }

    }

  }



  private Mono<ClientResponse> executeGet(String path) {

    System.out.println(String.format("[%s] About to execute an HTTP GET request: %s", Thread.currentThread().getName(), path));

    return webClient

      .get()

      .uri(path)

      .exchange();

  }


  private void heavyTask() {

    try {

      System.out.println(String.format("[%s] About to execute a heavy task", Thread.currentThread().getName()));

      Thread.sleep(TimeUnit.SECONDS.toMillis(20));

    } catch (InterruptedException ex) {

    }

  }

}


查看完整回答
反对 回复 2023-03-31
  • 1 回答
  • 0 关注
  • 147 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信