为了账号安全,请及时绑定邮箱和手机立即绑定

Project Reactor 将发布者一分为二,至少有两个订阅者

Project Reactor 将发布者一分为二,至少有两个订阅者

饮歌长啸 2023-10-19 18:21:05
如何在 Reactor 中将发布者分成两个,这样就存在两个相同的数据流,可以在不同流的下游进行处理?因此我可以映射每个流并单独订阅每个流。我在 API 中看不到任何表明这是在API上的内容。我需要等到两个订阅者都启动并准备好后才能发布。
查看完整描述

1 回答

?
函数式编程

TA贡献1807条经验 获得超9个赞

感谢您的输入并没有直接思考,当然只是有多个订阅者,如下所示:


  val flux = Flux.just("MyData1", "MyData2", "MyData3");


  flux.doOnNext { println("Subscribing one$it") }.subscribe()


  flux.doOnNext { println("Subscribing Two$it") }.subscribe()

将输出:


Subscribing oneMyData1

Subscribing oneMyData2

Subscribing oneMyData3

Subscribing TwoMyData1

Subscribing TwoMyData2

Subscribing TwoMyData3

正如上面所建议的,有Share,但这个 API 不允许设置最小订阅者数量,因此最好调用下面的函数,就我而言,我想等到我们有两个订阅者。文档指出


首次订阅的 Flux 会导致源 Flux 订阅一次,因此迟到的订阅者可能会错过项目。


val flux = Flux.just("MyData1", "MyData2", "MyData3").publish().refCount(2)

这会产生以下输出,以确保在启动第二个订阅者出现延迟时不会丢失消息。


Subscribing oneMyData1

Subscribing TwoMyData1

Subscribing oneMyData2

Subscribing TwoMyData2

Subscribing oneMyData3

Subscribing TwoMyData3


查看完整回答
反对 回复 2023-10-19
  • 1 回答
  • 0 关注
  • 87 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信