1 回答
TA贡献1807条经验 获得超9个赞
感谢您的输入并没有直接思考,当然只是有多个订阅者,如下所示:
val flux = Flux.just("MyData1", "MyData2", "MyData3");
flux.doOnNext { println("Subscribing one$it") }.subscribe()
flux.doOnNext { println("Subscribing Two$it") }.subscribe()
将输出:
Subscribing oneMyData1
Subscribing oneMyData2
Subscribing oneMyData3
Subscribing TwoMyData1
Subscribing TwoMyData2
Subscribing TwoMyData3
正如上面所建议的,有Share,但这个 API 不允许设置最小订阅者数量,因此最好调用下面的函数,就我而言,我想等到我们有两个订阅者。文档指出
首次订阅的 Flux 会导致源 Flux 订阅一次,因此迟到的订阅者可能会错过项目。
val flux = Flux.just("MyData1", "MyData2", "MyData3").publish().refCount(2)
这会产生以下输出,以确保在启动第二个订阅者出现延迟时不会丢失消息。
Subscribing oneMyData1
Subscribing TwoMyData1
Subscribing oneMyData2
Subscribing TwoMyData2
Subscribing oneMyData3
Subscribing TwoMyData3
添加回答
举报