为了账号安全,请及时绑定邮箱和手机立即绑定

如何收集顺序调用异步 API 的结果?

如何收集顺序调用异步 API 的结果?

森栏 2021-09-03 16:29:55
我有一个异步 API,它基本上通过分页返回结果public CompletableFuture<Response> getNext(int startFrom);每个Response对象包含一个偏移量列表startFrom和一个标志,指示是否还有更多元素剩余,因此,另一个getNext()请求。我想编写一个遍历所有页面并检索所有偏移量的方法。我可以像这样以同步方式编写它int startFrom = 0;List<Integer> offsets = new ArrayList<>();for (;;) {    CompletableFuture<Response> future = getNext(startFrom);    Response response = future.get(); // an exception stops everything    if (response.getOffsets().isEmpty()) {        break; // we're done    }    offsets.addAll(response.getOffsets());    if (!response.hasMore()) {        break; // we're done    }    startFrom = getLast(response.getOffsets());}换句话说,我们在 0 处调用getNext()with startFrom。如果抛出异常,我们将整个过程短路。否则,如果没有偏移,我们就完成了。如果有偏移量,我们将它们添加到主列表中。如果没有更多可取的,我们就完成了。否则,我们将 重置为startFrom我们获取并重复的最后一个偏移量。理想情况下,我希望在不阻塞CompletableFuture::get()并返回CompletableFuture<List<Integer>>包含所有偏移量的情况下执行此操作。我怎样才能做到这一点?我怎样才能组成期货来收集他们的结果?我正在考虑“递归”(实际上不是在执行中,而是在代码中)private CompletableFuture<List<Integer>> recur(int startFrom, List<Integer> offsets) {    CompletableFuture<Response> future = getNext(startFrom);    return future.thenCompose((response) -> {        if (response.getOffsets().isEmpty()) {            return CompletableFuture.completedFuture(offsets);        }        offsets.addAll(response.getOffsets());        if (!response.hasMore()) {            return CompletableFuture.completedFuture(offsets);        }        return recur(getLast(response.getOffsets()), offsets);    });}public CompletableFuture<List<Integer>> getAll() {    List<Integer> offsets = new ArrayList<>();    return recur(0, offsets);}从复杂性的角度来看,我不喜欢这个。我们能做得更好吗?
查看完整描述

2 回答

?
慕桂英3389331

TA贡献2036条经验 获得超8个赞

我还想尝试一下EA Async,因为它实现了对async/await 的Java 支持(受 C# 启发)。所以我只是拿了你的初始代码,并转换了它:


public CompletableFuture<List<Integer>> getAllEaAsync() {

    int startFrom = 0;

    List<Integer> offsets = new ArrayList<>();


    for (;;) {

        // this is the only thing I changed!

        Response response = Async.await(getNext(startFrom));

        if (response.getOffsets().isEmpty()) {

            break; // we're done

        }

        offsets.addAll(response.getOffsets());

        if (!response.hasMore()) {

            break; // we're done

        }

        startFrom = getLast(response.getOffsets());

    }


    // well, you also have to wrap your result in a future to make it compilable

    return CompletableFuture.completedFuture(offsets);

}

然后您必须检测您的代码,例如通过添加


Async.init();

在你的main()方法开始时。


我必须说:这看起来真的很神奇!


在幕后,EA异步注意到还有一个Async.await()方法中调用,而重写能够处理所有的thenCompose()/ thenApply()/递归你。唯一的要求是您的方法必须返回一个CompletionStage或CompletableFuture。


这真的是异步代码变得简单!


查看完整回答
反对 回复 2021-09-03
?
慕森王

TA贡献1777条经验 获得超3个赞

为了练习,我制作了这个算法的通用版本,但它相当复杂,因为你需要:

  1. 调用服务的初始值 (the startFrom)

  2. 服务调用本身 ( getNext())

  3. 用于累积中间值的结果容器 (the offsets)

  4. 一个累加器 ( offsets.addAll(response.getOffsets()))

  5. 执行“递归”的条件 ( response.hasMore())

  6. 计算下一个输入的函数 ( getLast(response.getOffsets()))

所以这给出了:

public <T, I, R> CompletableFuture<R> recur(T initialInput, R resultContainer,

        Function<T, CompletableFuture<I>> service,

        BiConsumer<R, I> accumulator,

        Predicate<I> continueRecursion,

        Function<I, T> nextInput) {

    return service.apply(initialInput)

            .thenCompose(response -> {

                accumulator.accept(resultContainer, response);

                if (continueRecursion.test(response)) {

                    return recur(nextInput.apply(response),

                            resultContainer, service, accumulator,

                            continueRecursion, nextInput);

                } else {

                    return CompletableFuture.completedFuture(resultContainer);

                }

            });

}


public CompletableFuture<List<Integer>> getAll() {

    return recur(0, new ArrayList<>(), this::getNext,

            (list, response) -> list.addAll(response.getOffsets()),

            Response::hasMore,

            r -> getLast(r.getOffsets()));

}

的一小的简化recur()是可能通过替换initialInput由所述CompletableFuture由所述第一呼叫的结果返回时,resultContainer并且accumulator可以被合并成一个单一的Consumer和service然后可以用合并后的nextInput功能。


但这有点复杂getAll():


private <I> CompletableFuture<Void> recur(CompletableFuture<I> future,

        Consumer<I> accumulator,

        Predicate<I> continueRecursion,

        Function<I, CompletableFuture<I>> service) {

    return future.thenCompose(result -> {

        accumulator.accept(result);

        if (continueRecursion.test(result)) {

            return recur(service.apply(result), accumulator, continueRecursion, service);

        } else {

            return CompletableFuture.completedFuture(null);

        }

    });

}


public CompletableFuture<List<Integer>> getAll() {

    ArrayList<Integer> resultContainer = new ArrayList<>();

    return recur(getNext(0),

            result -> resultContainer.addAll(result.getOffsets()),

            Response::hasMore,

            r -> getNext(getLast(r.getOffsets())))

            .thenApply(unused -> resultContainer);

}


查看完整回答
反对 回复 2021-09-03
  • 2 回答
  • 0 关注
  • 121 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信