添加一些代码来解决问题//generates a sequence in the range from input value (+1) to input value (+9)Observable<ColoredIntegerModel> getSequenceObservable(int value, int delay, int color) { return Observable.range(value+1,9) .map(i -> { Log.d(TAG, "Value " + i + " evaluating on " + Thread.currentThread().getName() + " emitting item at " + System.currentTimeMillis()); try { Thread.sleep(delay); } catch (InterruptedException e) { } return new ColoredIntegerModel(i, color); }); }//creates a stream if say input =2 of numbers from 1-20 (input*2) such that the output is 1 (Red color) 2-10(green color) 11 (Red color) 11-20 (Green Color) Observable<ColoredIntegerModel> getEventStream(int value) { return Observable.create(new ObservableOnSubscribe<ColoredIntegerModel>() { @Override public void subscribe(ObservableEmitter<ColoredIntegerModel> emitter) throws Exception { for (int i = 0; i < value; ++i) { ColoredIntegerModel model = new ColoredIntegerModel(i*10, Color.RED); emitter.onNext(model); Observable<ColoredIntegerModel> more = getSequenceObservable(i*10, 100, Color.GREEN); more.subscribe(new Consumer<ColoredIntegerModel>() { @Override public void accept(ColoredIntegerModel coloredIntegerModel) throws Exception { emitter.onNext(coloredIntegerModel); } }); } } }); }上面的代码有效。它打印 1(Red) 2-10(Green) 11(Red), 12-20,但我想要一个更干净的解决方案。我也不确定何时可以处理 getEventStream() 中的内部订阅。问题基本上是 getEventStream 为每个发射调用一个函数,该函数也返回一个 Observable 。这类似于一个 Promise 链,其中每个单独的 Promise 可以返回一系列其他 Promise。希望这可以澄清对原始问题的任何混淆。
添加回答
举报
0/150
提交
取消