为了账号安全,请及时绑定邮箱和手机立即绑定

RxJava 是否适合分支工作流?

RxJava 是否适合分支工作流?

莫回无 2021-05-31 11:14:18
我正在使用 RxJava 来处理我们从队列中提取的一些通知。RxJava 似乎在一个简单的工作流中工作得很好,现在随着新需求的出现,流程变得越来越复杂,分支越来越多(请参见下图作为参考) 我试图用一个小单元测试来举例说明流程:@Testpublic void test() {    Observable.range(1, 100)        .groupBy(n -> n % 3)        .toMap(GroupedObservable::getKey)        .flatMap(m1 -> {            Observable<Integer> ones1 = m1.get(0);            Observable<Integer> twos1 = m1.get(1).map(n -> n - 10);            Observable<Integer> threes = m1.get(2).map(n -> n + 100);            Observable<Integer> onesAndTwos = Observable.merge(ones1, twos1)                .map(n -> n * 3)                .groupBy(n -> n % 2)                .toMap(GroupedObservable::getKey)                .flatMap(m2 -> {                    Observable<Integer> ones2 = m2.get(0).map(n -> n * 10);                    Observable<Integer> twos2 = m2.get(1).map(n -> n * 100);                    return Observable.merge(ones2, twos2);                });                return Observable.merge(onesAndTwos, threes).map(n -> n +1);        })        .subscribe(System.out::println);}虽然使用 RxJava 在技术上仍然可以实现,但我现在想知道它是否是一个不错的选择,因为我必须在 main 中进行 2 级嵌套flatMap,这看起来不太整洁。这是描述上述工作流程的正确方式吗?或者 RxJava 不适合分支工作流?
查看完整描述

2 回答

?
HUX布斯

TA贡献1876条经验 获得超6个赞

分组 observable 是 AFAIK 的正确方法。就个人而言,如果您图片中“按类型拆分”和“合并所有内容”之间的任何内容是异步的,那么在 RX 中执行此操作肯定有很多优点,例如重试逻辑、缓冲、错误处理、背压等。如果它是常规的非异步代码,我猜这是个人喜好。您可以使用 RX 来完成,但您也可以使用常规同步代码在“按类型拆分”和“合并所有内容”之间执行所有操作。

无论您选择哪种方式,拆分代码以使其更具可读性始终是一个好主意,这样您就可以像阅读您附加的图像一样轻松地“阅读流程”。


查看完整回答
反对 回复 2021-06-02
?
牛魔王的故事

TA贡献1830条经验 获得超3个赞

只是另一种可能适合您的方法的想法:您可以多播源并单独处理分支,而不是分组/toMap。


例子:


@Test

public void multicastingShare() {

    final Observable<Integer> sharedSource = Observable.range(1, 10)

            .doOnSubscribe(dummy -> System.out.println("subscribed"))

            .share();

    // split by some criteria

    final Observable<String> oddItems = sharedSource

            .filter(n -> n % 2 == 1)

            .map(odd -> "odd: " + odd)

            .doOnNext(System.out::println);

    final Observable<String> evenItems = sharedSource

            .filter(n -> n % 2 == 0)

            .map(even -> "even: " + even)

            .doOnNext(System.out::println);


    // recombine the individual streams at some point

    Observable.concat(oddItems, evenItems)

            .subscribe(result -> System.out.println("result: " + result));

}


查看完整回答
反对 回复 2021-06-02
  • 2 回答
  • 0 关注
  • 309 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信