为了账号安全,请及时绑定邮箱和手机立即绑定

合并热通量源

合并热通量源

湖上湖 2022-09-01 17:24:03
在Spring Boot 2 with Reactor中,我试图合并两个热源。但是,似乎唯一一个报告了 中的两个参数中的第一个。我如何识别第二个.FluxmergeFluxmergemergeFlux在下面的示例中,in 甚至不会打印 when 是第一个参数。如果我做第一个,那么不打印。System.errB-2outgoing1aoutgoing2A-2以下是完整的示例;package com.example.demo;import java.time.Duration;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import reactor.core.publisher.Flux;import reactor.core.scheduler.Schedulers;public class Weather {String city;Integer temperature;public Weather(String city, Integer temperature) {    this.city = city;    this.temperature = temperature;}@Overridepublic String toString() {    return "Weather [city=" + city + ", temperature=" + temperature + "]";}public static void main(String[] args) {    BlockingQueue<Weather> queue = new LinkedBlockingQueue<>();    BlockingQueue<Weather> queue2 = new LinkedBlockingQueue<>();    // Assume Spring @Repository "A-1"    new Thread(() -> {        for (int d = 1; d < 1000; d += 1) {            for (String s: new String[] {"LDN", "NYC", "PAR", "ZUR"}) {                queue.add(new Weather(s, d));                try { Thread.sleep(250); } catch (InterruptedException e) {}            }        }    }).start();     // Assume Spring @Repository "B-1"    new Thread(() -> {        for (int d = 1; d < 1000; d += 1) {            for (String s: new String[] {"MOS", "TLV"}) {                queue2.add(new Weather(s, d));                try { Thread.sleep(1000); } catch (InterruptedException e) {}            }        }    }).start();    // Assume Spring @Service "A-2" = real-time LDN, NYC, PAR, ZUR    Flux<Weather> outgoing1 = Flux.<Weather>create(        sink -> {            for (int i = 0; i < 1000; i++) {                try {                    sink.next(queue.take());                    System.err.println("1 " + queue.size());                } catch (InterruptedException e) {                    e.printStackTrace();                }            }            sink.complete();        }    )
查看完整描述

1 回答

?
阿晨1998

TA贡献2037条经验 获得超6个赞

这里有一些事情在起作用。

  1. 请注意 .merge 运算符的以下建议...

请注意,合并是为使用异步源或有限源而定制的。当处理尚未在专用计划程序上发布的无限源时,您必须将该源隔离在其自己的计划程序中,否则合并会尝试在订阅另一个源之前将其排出。

  1. 您的出站助焊剂使用 ,但这只影响在运算符之后链接的运算符。即,它不会影响之前的任何内容。具体来说,它不会影响 lambda 中的代码传递到执行的线程。如果您在每个出站通量之前添加,您可以看到这一点。.publishOn.publishOn.publishOnFlux.create.log().publishOn

  2. 您的 lambda 已传递给调用阻塞方法 ()。Flux.createqueue.take

由于您在线程中调用合并的 Flux,因此您的 lambda 将传递给线程中的执行,并阻止它。subscribe(...)mainFlux.createmain

最简单的解决方法是使用而不是使 lambda 中的代码传递到不同的线程(不是 )上运行。这将防止线程阻塞,并允许来自两个出站流的合并输出交错。.subscribeOn.publishOnFlux.createmainmain


查看完整回答
反对 回复 2022-09-01
  • 1 回答
  • 0 关注
  • 97 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信