2 回答
TA贡献1789条经验 获得超10个赞
我找到了一个可行的解决方案,但我不确定它是否是最好的:
我设置了一个用于发送异步消息的 Mono,并将其与一个过滤匹配消息的 Flux 合并。看到 Mono 从不发出值,我知道合并的第一个对象是来自我的 Flux 的响应消息,所以我可以将它转换为正确的类型。
这仍然感觉有点脏,但话又说回来,尝试使用用于异步工作的框架获得阻塞行为总是感觉有点脏......
public AntMessage sendBlocking(AntBlockingMessage requestMessage) {
Flux<AntMessage> response = this.antUsbReader.antMessages()
.filter(responseMessage -> isMatchingResponse(requestMessage, responseMessage))
.take(1);
Mono<Void> messageSender = Mono.fromRunnable(() -> this.antUsbWriter.write(requestMessage));
return (AntMessage) Flux.merge(response, messageSender).blockFirst(Duration.ofSeconds(1));
}
private boolean isMatchingResponse(AntBlockingMessage message, AntMessage response) {
if (message instanceof RequestMessage) {
return response.getMessageId() == ((RequestMessage) message).getMsgIdRequested();
}
return response.getMessageId() == message.getMessageId();
}
TA贡献1848条经验 获得超10个赞
查看您的代码后,我会在球场上提出一些建议。我是用手机写的,所以还没有测试过。
但是我们先写,然后阻塞 1 秒,然后我们返回过滤响应的提取。
Flux<AntMessage> response = Mono.fromRunnable(() -> this.antUsbWriter.write(requestMessage)) .block(Duration.ofSeconds(1)) .thenReturn(this.antUsbReader.antMessages() .filter(responseMessage -> isMatchingResponse(requestMessage, responseMessage)) .take(1));
添加回答
举报