1 回答
TA贡献1790条经验 获得超9个赞
如果您可以通过检查识别出最后Item
发出的信号,则可以使用运算符:getNew
.expand
public Flux<Item> observe(long id) {
return getNew(id)
.expand(item -> isLast(item)
? getNew(item.id)
: Flux.empty());
}
/**
* @return true if the given item is the last item emitted by getNew
*/
private boolean isLast(Item item) {
return // ... snip ...
}
如果您不能通过检查来识别最后一个Item
,那么您将不得不使用状态变量。虽然,我建议使用.defer
and.repeat
而不是 .interval
...
public Flux<Item> observe(long id) {
final AtomicLong nextStartId = new AtomicLong(id);
return Flux.defer(() -> getNew(nextStartId.get()))
.doOnNext(item -> nextStartId.set(item.id))
.repeat();
}
反对使用的主要原因.interval
是:
如果没有及时产生需求,将发出 onError 信号
因此,如果 API 花费的时间太长,或者处理结果的时间太长,流将以错误结束。对于较长的间隔,这可能不是问题,但对于相对较短的间隔(例如您的示例中的 1 秒),这可能是一个问题。
如果你想在每次重复迭代之前延迟,那么你可以使用.repeatWhen
, 带有 reactor-extra 的Repeat
固定退避。这将为您提供“固定延迟”语义,而不是“固定间隔”。
添加回答
举报