为了账号安全,请及时绑定邮箱和手机立即绑定

如何将轮询 api 转换为反应流

如何将轮询 api 转换为反应流

慕哥6287543 2023-03-31 09:34:32
假设我有一个具有以下签名的函数:class Item {  String name;  Long id;}public Flux<Item> getNew(long id);getNew()返回在 id (0..N) 之后添加的项目流。那么如何将其变成无限流呢?所以像这样:public Flux<Item> observe(long id) {    return Flux.interval(Duration.ofSeconds(1)).             flatMap(counter -> getNew(id)); // <-- how to use last value from getNew flux as the new id                }我能够做到的唯一方法是使用某种类型的状态变量:   public Flux<Long> observe(long id) {     final AtomicLong last = new AtomicLong(id);     return Flux.interval(Duration.ofSeconds(1)).         flatMap(l -> getNew(last.get())).         doOnNext(last::set);       }    有没有更惯用的方法来做到这一点?我试图为此创建生成器,但我不知道如何实现它。
查看完整描述

1 回答

?
富国沪深

TA贡献1790条经验 获得超9个赞

如果您可以通过检查识别出最后Item发出的信号,则可以使用运算符:getNew.expand

   public Flux<Item> observe(long id) {

        return getNew(id)

                .expand(item -> isLast(item)

                        ? getNew(item.id)

                        : Flux.empty());

    }

    /**

     * @return true if the given item is the last item emitted by getNew

     */

    private boolean isLast(Item item) {

        return // ... snip ...

    }

如果您不能通过检查来识别最后一个Item,那么您将不得不使用状态变量。虽然,我建议使用.deferand.repeat而不是 .interval...


 public Flux<Item> observe(long id) {

        final AtomicLong nextStartId = new AtomicLong(id);

        return Flux.defer(() -> getNew(nextStartId.get()))

                .doOnNext(item -> nextStartId.set(item.id))

                .repeat();

    }

反对使用的主要原因.interval是:

如果没有及时产生需求,将发出 onError 信号

因此,如果 API 花费的时间太长,或者处理结果的时间太长,流将以错误结束。对于较长的间隔,这可能不是问题,但对于相对较短的间隔(例如您的示例中的 1 秒),这可能是一个问题。

如果你想在每次重复迭代之前延迟,那么你可以使用.repeatWhen, 带有 reactor-extra 的Repeat固定退避。这将为您提供“固定延迟”语义,而不是“固定间隔”。


查看完整回答
反对 回复 2023-03-31
  • 1 回答
  • 0 关注
  • 113 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信