为了账号安全,请及时绑定邮箱和手机立即绑定

如何在项目反应器中设置阻塞异步请求/响应?

如何在项目反应器中设置阻塞异步请求/响应?

冉冉说 2023-05-10 14:19:48
我正在连接一个 ANT+ USB 记忆棒,并用项目反应器替换我自己的天真“MessageBus”,因为它看起来非常合适。USB 接口本质上是异步的(单独的输入/输出管道),我想以阻塞方式处理一组请求/响应消息。我已经设置了一个单独的线程,它不断地从 usb in-pipe 中读取消息并将它们写入一个接收器,该接收器提供一个共享的 Flux,任何人都可以订阅。这似乎工作正常。目前我向 usb 管道发送一条消息,然后在共享通量上使用 .filter() 和 .blockFirst() :(人为代码)    /**     * Puts a message on the Usb  out Pipe and waits for the relevant asynchronous response on the {@link AntUsbReader#antMessages()} {@link Flux}     *     * @param message Message to send.     * @return related response message.     */    public AntMessage sendBlocking(AntBlockingMessage message) {        send(message); // in essence, calls usbOutPipe.syncSubmit(message.getBytes()), returns void        // bug: ant dongle can reply to message even before following Flux is activated, meaning .blockFirst() goes in timeout.        return this.antUsbMessageReader.antMessages() // .antMessages() is an (infinite)  Flux<AntMessage>                .filter(antMessage -> antMessage.getMessageId() == message.getMessageId())                .blockFirst(Duration.ofSeconds(10));    }问题是 usb 记忆棒甚至在 flux 被激活之前就可以响应,从而导致 TimeoutException。添加一个Thread.sleep(10)到 usb 读卡器“解决”了这个问题,但是实现这种阻塞行为的正确方法是什么?设置订阅(使用 .take(1)),发送消息然后阻塞订阅?设置一个发送和等待正确响应都完成的 Flux?我想不通...
查看完整描述

2 回答

?
至尊宝的传说

TA贡献1789条经验 获得超10个赞

我找到了一个可行的解决方案,但我不确定它是否是最好的:


我设置了一个用于发送异步消息的 Mono,并将其与一个过滤匹配消息的 Flux 合并。看到 Mono 从不发出值,我知道合并的第一个对象是来自我的 Flux 的响应消息,所以我可以将它转换为正确的类型。


这仍然感觉有点脏,但话又说回来,尝试使用用于异步工作的框架获得阻塞行为总是感觉有点脏......


    public AntMessage sendBlocking(AntBlockingMessage requestMessage) {

        Flux<AntMessage> response = this.antUsbReader.antMessages()

                .filter(responseMessage -> isMatchingResponse(requestMessage, responseMessage))

                .take(1);


        Mono<Void> messageSender = Mono.fromRunnable(() -> this.antUsbWriter.write(requestMessage));

        return (AntMessage) Flux.merge(response, messageSender).blockFirst(Duration.ofSeconds(1));

    }


    private boolean isMatchingResponse(AntBlockingMessage message, AntMessage response) {

        if (message instanceof RequestMessage) {

            return response.getMessageId() == ((RequestMessage) message).getMsgIdRequested();

        }

        return response.getMessageId() == message.getMessageId();

    }


查看完整回答
反对 回复 2023-05-10
?
慕桂英546537

TA贡献1848条经验 获得超10个赞

查看您的代码后,我会在球场上提出一些建议。我是用手机写的,所以还没有测试过。

但是我们先写,然后阻塞 1 秒,然后我们返回过滤响应的提取。

Flux<AntMessage> response = Mono.fromRunnable(() -> this.antUsbWriter.write(requestMessage))
    .block(Duration.ofSeconds(1))
    .thenReturn(this.antUsbReader.antMessages()
            .filter(responseMessage -> isMatchingResponse(requestMessage, responseMessage))
            .take(1));


查看完整回答
反对 回复 2023-05-10
  • 2 回答
  • 0 关注
  • 134 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信