我的WebSocketHandler实现中有这个:@Overridepublic Mono<Void> handle(WebSocketSession session) { return session.send( session.receive() .flatMap(webSocketMessage -> { int id = Integer.parseInt(webSocketMessage.getPayloadAsText()); Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id); var publisher = flux .<String>handle((o, sink) -> { try { sink.next(objectMapper.writeValueAsString(o)); } catch (JsonProcessingException e) { e.printStackTrace(); } }) .map(session::textMessage); return publisher; }) );}当前Flux<EfficiencyData>生成的用于在服务中进行测试如下:public Flux<EfficiencyData> subscribeToEfficiencyData(long weavingLoomId) { return Flux.interval(Duration.ofSeconds(1)) .map(aLong -> { longAdder.increment(); return new EfficiencyData(new MachineSpeed( RotationSpeed.ofRpm(longAdder.intValue()), RotationSpeed.ofRpm(0), RotationSpeed.ofRpm(400))); }).publish().autoConnect();}我用publish().autoConnect()它来使它成为热流。我创建了一个单元测试,它启动了 2 个线程,这些线程在返回的内容上执行此操作Flux:flux.log().handle((s, sink) -> { LOGGER.info("{}", s.getMachineSpeed().getCurrent()); }).subscribe();在这种情况下,我看到两个线程每秒都打印出相同的值。但是,当我打开 2 个浏览器选项卡时,我在两个网页中看不到相同的值。连接的 websocket 客户端越多,值之间的差异就越大(因此原始 Flux 中的每个值似乎都发送到不同的客户端,而不是发送到所有客户端)。
1 回答
白板的微信
TA贡献1883条经验 获得超3个赞
问题是对于每个连接的 websocket 客户端,我都会调用该service.subscribeToEfficiencyData(id)
方法,每次调用它都会返回一个新的Flux。因此,当然,这些独立的 Flux 不会在不同的 websocket 客户端之间共享。
为了解决这个问题,我Flux
在构造函数或PostConstruct
我的服务的方法中创建实例,以便subscribeToEfficiencyData
每次都返回相同的 Flux 实例。
请注意,.publish().autoConnect()
在 Flux 上仍然很重要,因为没有那个 websocket 客户端将再次看到不同的值!
添加回答
举报
0/150
提交
取消