在Spring Boot 2 with Reactor中,我试图合并两个热源。但是,似乎唯一一个报告了 中的两个参数中的第一个。我如何识别第二个.FluxmergeFluxmergemergeFlux在下面的示例中,in 甚至不会打印 when 是第一个参数。如果我做第一个,那么不打印。System.errB-2outgoing1aoutgoing2A-2以下是完整的示例;package com.example.demo;import java.time.Duration;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import reactor.core.publisher.Flux;import reactor.core.scheduler.Schedulers;public class Weather {String city;Integer temperature;public Weather(String city, Integer temperature) { this.city = city; this.temperature = temperature;}@Overridepublic String toString() { return "Weather [city=" + city + ", temperature=" + temperature + "]";}public static void main(String[] args) { BlockingQueue<Weather> queue = new LinkedBlockingQueue<>(); BlockingQueue<Weather> queue2 = new LinkedBlockingQueue<>(); // Assume Spring @Repository "A-1" new Thread(() -> { for (int d = 1; d < 1000; d += 1) { for (String s: new String[] {"LDN", "NYC", "PAR", "ZUR"}) { queue.add(new Weather(s, d)); try { Thread.sleep(250); } catch (InterruptedException e) {} } } }).start(); // Assume Spring @Repository "B-1" new Thread(() -> { for (int d = 1; d < 1000; d += 1) { for (String s: new String[] {"MOS", "TLV"}) { queue2.add(new Weather(s, d)); try { Thread.sleep(1000); } catch (InterruptedException e) {} } } }).start(); // Assume Spring @Service "A-2" = real-time LDN, NYC, PAR, ZUR Flux<Weather> outgoing1 = Flux.<Weather>create( sink -> { for (int i = 0; i < 1000; i++) { try { sink.next(queue.take()); System.err.println("1 " + queue.size()); } catch (InterruptedException e) { e.printStackTrace(); } } sink.complete(); } )
1 回答
阿晨1998
TA贡献2037条经验 获得超6个赞
这里有一些事情在起作用。
请注意
.merge
运算符的以下建议...
请注意,合并是为使用异步源或有限源而定制的。当处理尚未在专用计划程序上发布的无限源时,您必须将该源隔离在其自己的计划程序中,否则合并会尝试在订阅另一个源之前将其排出。
您的出站助焊剂使用 ,但这只影响在运算符之后链接的运算符。即,它不会影响之前的任何内容。具体来说,它不会影响 lambda 中的代码传递到执行的线程。如果您在每个出站通量之前添加,您可以看到这一点。
.publishOn
.publishOn
.publishOn
Flux.create
.log()
.publishOn
您的 lambda 已传递给调用阻塞方法 ()。
Flux.create
queue.take
由于您在线程中调用合并的 Flux,因此您的 lambda 将传递给线程中的执行,并阻止它。subscribe(...)
main
Flux.create
main
最简单的解决方法是使用而不是使 lambda 中的代码传递到不同的线程(不是 )上运行。这将防止线程阻塞,并允许来自两个出站流的合并输出交错。.subscribeOn
.publishOn
Flux.create
main
main
添加回答
举报
0/150
提交
取消