为了账号安全,请及时绑定邮箱和手机立即绑定

如何对 Flux 进行移动窗口计算并将结果输出为新的 Flux

如何对 Flux 进行移动窗口计算并将结果输出为新的 Flux

神不在的星期二 2022-12-28 16:35:49
我想对 Flux 执行移动窗口计算并生成包含计算值的 Flux,但我不知道如何完成此操作。作为一个简化的例子,假设我有一个整数通量,我想用这个通量中每 3 个连续整数的总和生成一个新的通量。为了显示:第一个通量包含从 1 到 8 的整数:{1, 2, 3, 4, 5, 6, 7, 8}结果通量应包含总和:{1+2+3、2+3+4、3+4+5、4+5+6、5+6+7、6+7+8}我可以轻松地生成第一个 Flux 并导出包含连续 3 个值的通量通量,如下所示:Flux<Integer> f1 = Flux.range(1,8); Flux<Flux<Integer>> f2 = f1.window(3,1);我也可以订阅 () 到 f2 并计算总和,但我不知道如何同时将这些总和发布为新的 Flux。我错过了一些简单的事情,还是这种事情真的很难做?
查看完整描述

1 回答

?
HUWWW

TA贡献1874条经验 获得超12个赞

您可以使用.reduce(Integer::sum)内部通量来执行窗口中元素的总和,并.flatMap使用外部通量将这些总和合并回单个流。


请注意,由于.window调用 with maxSize < skip,因此尾随窗口中的项目将小于最大大小。


Flux<Integer> sums = Flux.range(1, 8)                    // Flux<Integer>

        .window(3, 1)                                    // Flux<Flux<Integer>>

        .flatMap(window -> window.reduce(Integer::sum)); // Flux<Integer>


StepVerifier.create(sums)

        .expectNext(6)  // 1+2+3

        .expectNext(9)  // 2+3+4

        .expectNext(12) // 3+4+5

        .expectNext(15) // 4+5+6

        .expectNext(18) // 5+6+7

        .expectNext(21) // 6+7+8

        .expectNext(15) // 7+8

        .expectNext(8)  // 8

        .verifyComplete();


查看完整回答
反对 回复 2022-12-28
  • 1 回答
  • 0 关注
  • 67 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信