为了账号安全,请及时绑定邮箱和手机立即绑定

当我从 ReplaySubject 使用 Observable 时阻止

当我从 ReplaySubject 使用 Observable 时阻止

慕仙森 2023-05-17 17:56:28
我正在编写客户端-服务器应用程序。我从数据库中获取数据并将其放入 rxjava2 的 ReplaySubject(ReplaySubject 是必需的,因为我需要保证每个客户端上的数据相同)当客户端连接订阅它时,我想将此数据发送给他但是当我尝试它时我的头“可能的方式^_^”它阻止了。通过块我的意思是它不发送数据但是当我关闭服务器数据时立即显示在客户端。我尝试在客户端和服务器端事件循环中添加一些线程(我在想可能是线程块,因为我使用“无限”源所以接收这个我需要另一个线程或类似的东西)。服务器端通道代码:public    class ClientHandler        extends SimpleChannelInboundHandler<DataWrapper> {    private final Observable<DataWrapper> data;    public ClientHandler(Observable<DataWrapper> data) {        this.data = data;    }    @Override    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {        // super.channelRegistered(ctx);        final Channel channel = ctx.channel();        Server            .INSTANCE            .appendToChannelGroup(channel);    }    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        // super.channelActive(ctx);        // i believe there is something wrong        data.subscribe(ctx::writeAndFlush);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        ctx.flush();    }    // rest skip}客户端:public    class DirectNetworkCommunicator        extends SimpleChannelInboundHandler<DataWrapper> {    private Observable<DataWrapper> generatedData;    private ExecutorService fallbackThread;    DirectNetworkCommunicator(Observable<DataWrapper> generatedData) {        this.fallbackThread = Executors.newSingleThreadExecutor();        this.generatedData = generatedData;    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        // super.channelRead(ctx, msg);        DataWrapper inComingData = (DataWrapper) msg;        Adapter            .INSTANCE            .appendFromNettworkData(inComingData);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        // super.channelReadComplete(ctx);        ctx.flush();    }    // rest skip}所以我之前提到过我希望它在服务器关闭时接收数据,而不是在服务器关闭时接收数据 ^_^。如果那会帮助 netty 版本 4.1.37 final。
查看完整描述

1 回答

?
汪汪一只猫

TA贡献1898条经验 获得超8个赞

好的,所以未来的人们会面临同样的问题,我自己找到了答案。来自客户端的 Netty 使用后台线程作为通信的主要线程,这意味着我要等待主线程释放,然后它才能对 observable 进行操作。希望它能帮助别人。



查看完整回答
反对 回复 2023-05-17
  • 1 回答
  • 0 关注
  • 115 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信