为了账号安全,请及时绑定邮箱和手机立即绑定

如何从 aysnc 回调创建多个 Flux

如何从 aysnc 回调创建多个 Flux

一只名叫tom的猫 2022-05-25 10:57:06
从 Reactor 的参考指南中,我了解到Flux.create()可用于将 aysnc 回调转换为Flux.但是,有时回调有多种方法来接收多种类型的数据,假设我有如下代码:asrService.recognize(new Callback() {    @Override    public void stateChange(State state) {        // consume state    }    @Override    public void onResultData(Result result) {        // consume result    }});如何将其转换为两个反应流:Flux<State>和Flux<Result>?
查看完整描述

2 回答

?
暮色呼如

TA贡献1853条经验 获得超9个赞

一种方法是使用某些处理器,如 DirectProcessor,您可以创建 2 个不同的处理器,并在事件向处理器发出项目并订阅处理器时,但如果您仍想使用 Flux.create,您可以这样做


    Flux<Object> objectFlux;


@Override

public void run(String... args) throws Exception {


    objectFlux = Flux.create(objectFluxSink ->

            asrService.recognize(new Callback() {

                @Override

                public void stateChange(State state) {

                    objectFluxSink.next(state);

                }


                @Override

                public void onResultData(Result result) {

                    objectFluxSink.next(state);

                }

            }));






}


public Flux<Result> getResult(){

 return    objectFlux.filter(o -> o instanceof Result)

            .map(o -> ((Result)o));

}


public Flux<State> geState(){

    return    objectFlux.filter(o -> o instanceof State)

            .map(o -> ((State)o));

}

我仍然认为使用处理器应该更清洁,你不需要做那个过滤和铸造,但你需要有 2 个这样的处理器:


        DirectProcessor <Result> resultDirectProcessor = DirectProcessor.create();

    DirectProcessor<State> stateDirectProcessor = DirectProcessor.create();

    asrService.recognize(new Callback() {

        @Override

        public void stateChange(State state) {

            stateDirectProcessor.onNext(state);

        }


        @Override

        public void onResultData(Result result) {

            resultDirectProcessor.onNext(result);

        }

    });


查看完整回答
反对 回复 2022-05-25
?
ABOUTYOU

TA贡献1812条经验 获得超5个赞

只是可用于给定任务的小片段。


Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

Sinks.EmitResult result = sink.tryEmitNext("some string");


查看完整回答
反对 回复 2022-05-25
  • 2 回答
  • 0 关注
  • 87 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信