1 回答
TA贡献1796条经验 获得超4个赞
这个人为的例子可能会更清楚:
Scheduler single = Schedulers.newSingle("single-scheduler");
Flux.just("Bob")
.flatMap(x -> {
System.out.println(String.format(
"Saving person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.flatMap(x -> {
System.out.println(String.format(
"Finding person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.flatMap(x -> {
System.out.println(String.format(
"Deleting person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.subscribeOn(single)
.subscribe(aVoid -> System.out.println(String.format(
"Subscription from thread %s", Thread.currentThread().getName())));
这将给出类似的东西:
Saving person from thread single-scheduler-1
Finding person from thread elastic-2
Deleting person from thread elastic-3
Subscription from thread elastic-4
或者,换句话说,您的反应式存储库没有在同一个调度程序上发布,这就是您看到您所做的行为的原因。“Up until the next occurrence of publishOn()”并不意味着下次您的代码调用publishOn()- 它也可以在您的任何调用中的任何发布者中flatMap(),您将无法控制。
添加回答
举报