1 回答
TA贡献1848条经验 获得超10个赞
Reactor 和 RxJava 一样,可以被认为是并发不可知的。也就是说,它不强制执行并发模型。相反,它让您(开发人员)掌握一切。但是,这并不妨碍该库帮助您处理并发。
获得aFlux
或aMono
并不一定意味着它运行在专用的Thread中。相反,大多数运算符继续在前一个运算符执行的线程中工作。subscribe()
除非指定,否则最顶层的运算符(源)本身在进行调用的线程上运行。
从您的代码中,以下代码段:
webClient.post() .uri("/resource") .syncBody(res) .header("Content-Type", "application/json") .header("Accept", "application/json") .retrieve() .bodyToMono(Resource.class)
导致线程从main切换到netty 的工作池。之后,以下所有操作均由 netty 工作线程执行。
如果你想控制这种行为,你应该publishOn(...)
在你的代码中添加一条语句,例如:
webClient.post() .uri("/resource") .syncBody(res) .header("Content-Type", "application/json") .header("Accept", "application/json") .retrieve() .bodyToMono(Resource.class) .publishOn(Schedulers.elastic())
这样,弹性调度程序线程池将执行任何后续操作。
另一个例子是使用专用调度程序处理 HTTP 请求执行后的繁重任务。
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import com.github.tomakehurst.wiremock.WireMockServer;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import ru.lanwen.wiremock.ext.WiremockResolver;
import ru.lanwen.wiremock.ext.WiremockResolver.Wiremock;
import ru.lanwen.wiremock.ext.WiremockUriResolver;
import ru.lanwen.wiremock.ext.WiremockUriResolver.WiremockUri;
@ExtendWith({
WiremockResolver.class,
WiremockUriResolver.class
})
public class ReactiveThreadsControlTest {
private static int concurrency = 1;
private final WebClient webClient = WebClient.create();
@Test
public void slowServerResponsesTest(@Wiremock WireMockServer server, @WiremockUri String uri) {
String requestUri = "/slow-response";
server.stubFor(get(urlEqualTo(requestUri))
.willReturn(aResponse().withStatus(200)
.withFixedDelay((int) TimeUnit.SECONDS.toMillis(2)))
);
Flux
.generate(() -> Integer.valueOf(1), (i, sink) -> {
System.out.println(String.format("[%s] Emitting next value: %d", Thread.currentThread().getName(), i));
sink.next(i);
return i + 1;
})
.subscribeOn(Schedulers.single())
.flatMap(i ->
executeGet(uri + requestUri)
.publishOn(Schedulers.elastic())
.map(response -> {
heavyTask();
return true;
})
, concurrency)
.subscribe();
blockForever();
}
private void blockForever() {
Object monitor = new Object();
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException ex) {
}
}
}
private Mono<ClientResponse> executeGet(String path) {
System.out.println(String.format("[%s] About to execute an HTTP GET request: %s", Thread.currentThread().getName(), path));
return webClient
.get()
.uri(path)
.exchange();
}
private void heavyTask() {
try {
System.out.println(String.format("[%s] About to execute a heavy task", Thread.currentThread().getName()));
Thread.sleep(TimeUnit.SECONDS.toMillis(20));
} catch (InterruptedException ex) {
}
}
}
添加回答
举报