2 回答
TA贡献2036条经验 获得超8个赞
我还想尝试一下EA Async,因为它实现了对async/await 的Java 支持(受 C# 启发)。所以我只是拿了你的初始代码,并转换了它:
public CompletableFuture<List<Integer>> getAllEaAsync() {
int startFrom = 0;
List<Integer> offsets = new ArrayList<>();
for (;;) {
// this is the only thing I changed!
Response response = Async.await(getNext(startFrom));
if (response.getOffsets().isEmpty()) {
break; // we're done
}
offsets.addAll(response.getOffsets());
if (!response.hasMore()) {
break; // we're done
}
startFrom = getLast(response.getOffsets());
}
// well, you also have to wrap your result in a future to make it compilable
return CompletableFuture.completedFuture(offsets);
}
然后您必须检测您的代码,例如通过添加
Async.init();
在你的main()方法开始时。
我必须说:这看起来真的很神奇!
在幕后,EA异步注意到还有一个Async.await()方法中调用,而重写能够处理所有的thenCompose()/ thenApply()/递归你。唯一的要求是您的方法必须返回一个CompletionStage或CompletableFuture。
这真的是异步代码变得简单!
TA贡献1777条经验 获得超3个赞
为了练习,我制作了这个算法的通用版本,但它相当复杂,因为你需要:
调用服务的初始值 (the
startFrom
)服务调用本身 (
getNext()
)用于累积中间值的结果容器 (the
offsets
)一个累加器 (
offsets.addAll(response.getOffsets())
)执行“递归”的条件 (
response.hasMore()
)计算下一个输入的函数 (
getLast(response.getOffsets())
)
所以这给出了:
public <T, I, R> CompletableFuture<R> recur(T initialInput, R resultContainer,
Function<T, CompletableFuture<I>> service,
BiConsumer<R, I> accumulator,
Predicate<I> continueRecursion,
Function<I, T> nextInput) {
return service.apply(initialInput)
.thenCompose(response -> {
accumulator.accept(resultContainer, response);
if (continueRecursion.test(response)) {
return recur(nextInput.apply(response),
resultContainer, service, accumulator,
continueRecursion, nextInput);
} else {
return CompletableFuture.completedFuture(resultContainer);
}
});
}
public CompletableFuture<List<Integer>> getAll() {
return recur(0, new ArrayList<>(), this::getNext,
(list, response) -> list.addAll(response.getOffsets()),
Response::hasMore,
r -> getLast(r.getOffsets()));
}
的一小的简化recur()是可能通过替换initialInput由所述CompletableFuture由所述第一呼叫的结果返回时,resultContainer并且accumulator可以被合并成一个单一的Consumer和service然后可以用合并后的nextInput功能。
但这有点复杂getAll():
private <I> CompletableFuture<Void> recur(CompletableFuture<I> future,
Consumer<I> accumulator,
Predicate<I> continueRecursion,
Function<I, CompletableFuture<I>> service) {
return future.thenCompose(result -> {
accumulator.accept(result);
if (continueRecursion.test(result)) {
return recur(service.apply(result), accumulator, continueRecursion, service);
} else {
return CompletableFuture.completedFuture(null);
}
});
}
public CompletableFuture<List<Integer>> getAll() {
ArrayList<Integer> resultContainer = new ArrayList<>();
return recur(getNext(0),
result -> resultContainer.addAll(result.getOffsets()),
Response::hasMore,
r -> getNext(getLast(r.getOffsets())))
.thenApply(unused -> resultContainer);
}
添加回答
举报