为了账号安全,请及时绑定邮箱和手机立即绑定

在 RxJava 中使用 onErrorReturn 和 retryWhen

在 RxJava 中使用 onErrorReturn 和 retryWhen

慕尼黑8549860 2023-09-20 15:22:10
这是代码:import io.reactivex.Observable;import io.reactivex.Observer;import org.junit.jupiter.api.Test;import java.util.concurrent.ThreadLocalRandom;import java.util.concurrent.atomic.AtomicBoolean;public class RxJavaTest {    @Test    public void onErr() {        Observable<String> values1 = new Observable<String>() {            @Override            protected void subscribeActual(Observer<? super String> observer) {                observer.onNext("New");                observer.onNext("New1");                observer.onNext("New2");                observer.onNext("New3");                observer.onNext("New4");                if (ThreadLocalRandom                            .current()                            .nextInt(10) == 5) {                    observer.onError(new Exception("don't retry..."));                } else {                    observer.onError(new IllegalArgumentException("retry..."));                }            }        };        AtomicBoolean finished = new AtomicBoolean(false);        values1                .retryWhen(throwableObservable -> throwableObservable                        .takeWhile(throwable -> {                            boolean result = (throwable instanceof IllegalArgumentException);                            if (result) {                                System.out.println("Retry on error: " + throwable);                                return result;                            }                            System.out.println("Error: " + throwable);                            return result;                        })                        .take(20))                .onErrorReturn(throwable -> "Saved the day!")                .doOnTerminate(() -> finished.set(true))                .subscribe(v -> System.out.println(v));    }}目标是仅当存在IllegalArgumentException,对于任何其他异常,立即返回(通过onErrorReturn)。上面的代码实现了第一个目标,但第二个目标失败了,它停止重试,但忽略了该.onErrorReturn部分。知道如何让它发挥作用吗?
查看完整描述

2 回答

?
慕盖茨4494581

TA贡献1850条经验 获得超11个赞

您可以将其更改retryWhen为:


                .retryWhen(throwableObservable ->

                                throwableObservable.flatMap(throwable -> {

                                    if (throwable instanceof IllegalArgumentException) {

                                        System.out.println("Retry on error: " + throwable);

                                        return Observable.just(1);

                                    } else {

                                        System.out.println("Error: " + throwable);

                                        return Observable.<Integer>error(throwable);

                                    }

                                })

                )

为了使其重试,返回哪个值并不重要retryWhen(在上面的示例中返回 1)。根据javadoc:


如果 ObservableSource 调用 onComplete 或 onError,则重试将在子订阅上调用 onComplete 或 onError。否则,此 ObservableSource 将重新订阅源 ObservableSource。


查看完整回答
反对 回复 2023-09-20
?
慕田峪9158850

TA贡献1794条经验 获得超7个赞

作为记录,这是我在看到古斯塔沃的答案之前使用的解决方案onErrorResumeNext:


    private Observable<String> createObservable(long delay) {

        Observable<String> values1 = new Observable<String>() {

            @Override

            protected void subscribeActual(Observer<? super String> observer) {

                observer.onNext("New");

                observer.onNext("New1");

                observer.onNext("New2");

                observer.onNext("New3");

                observer.onNext("New4");

                if (ThreadLocalRandom

                        .current()

                        .nextInt(8) == 2) {

                    observer.onError(new RuntimeException("don't retry..."));

                } else {

                    observer.onError(new IllegalArgumentException("retry..."));

                }

            }

        };

        return Observable.timer(delay, TimeUnit.SECONDS).flatMap(aLong -> values1)

                .onErrorResumeNext((Throwable throwable) -> {

                    if (throwable instanceof IllegalArgumentException) {

                        return createObservable(delay + 2);

                    } else {

                        return Observable.just("The default value");

                    }

                });

    }

这按预期工作,但我认为古斯塔沃建议的方式更容易理解。这是使用以下重写的相同函数retryWhen:


    private Observable<String> createObservable1() {

        Observable<String> values1 = new Observable<String>() {

            @Override

            protected void subscribeActual(Observer<? super String> observer) {

                observer.onNext("New");

                observer.onNext("New1");

                observer.onNext("New2");

                observer.onNext("New3");

                observer.onNext("New4");

                if (ThreadLocalRandom

                        .current()

                        .nextInt(3) == 1) {

                    observer.onError(new RuntimeException("don't retry..."));

                } else {

                    observer.onError(new IllegalArgumentException("retry..."));

                }

            }

        };

        return values1.retryWhen(throwableObservable ->

                throwableObservable

                        .zipWith(Observable.range(1, 5), (throwable, integer) -> {

                            if (throwable instanceof IllegalArgumentException) {

                                System.out.println("Retry on error: " + throwable);

                                return integer;

                            }

                            System.out.println("No retry on error: " + throwable);

                            return -1;

                        })

                        .flatMap(integer -> {

                            if (integer > 0) {

                                System.out.println("Delay " + integer + " sec on retry...");

                                return Observable.timer(integer, TimeUnit.SECONDS);

                            }

                            System.out.println("Return immediately...");

                            return Observable.error(new Exception());

                        })

        ).onErrorReturnItem("Saved the day!");

    }

希望这可以帮助。


查看完整回答
反对 回复 2023-09-20
  • 2 回答
  • 0 关注
  • 122 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信