为了账号安全,请及时绑定邮箱和手机立即绑定

使用组的通量并行串行执行

使用组的通量并行串行执行

GCT1015 2022-09-28 15:31:34
假设我有这个:Flux<GroupedFlux<Integer, Integer>> intsGrouped = Flux.range(0, 12)   .groupBy(i -> i % 3);并说我有一个方法:Mono<Integer> getFromService(Integer i);我想为每个组并行呼叫,但请确保每个组中的呼叫是串行的。getFromService对于上面的示例,这将是具有这些输入值的三个并行流:stream 1: 0 -> 3 -> 6 -> 9stream 2: 1 -> 4 -> 7 -> 10stream 3: 2 -> 5 -> 8 -> 11我试过这个,但它没有做我想做的事:Flux.range(0, 12)   .groupBy(i -> i % 3)   .flatMap(g -> g.flatMap(i -> getFromService(g.key(), i)))这是一次并行调用所有整数的服务。我该如何继续?
查看完整描述

1 回答

?
慕斯709654

TA贡献1840条经验 获得超5个赞

使用“连接映射”“平面映射”序列性而不是内部.flatMap


如果要在每个组中按顺序执行(即每个组中一次只有一个订阅),请使用 ,如下所示:getFromService.concatMap

Flux.range(0, 12)
   .groupBy(i -> i % 3)
   .flatMap(g -> g.concatMap(i -> getFromService(g.key(), i)))

如果组内的并行执行是可以的,但您只关心序列的发出顺序,则使用 ,如下所示:flatMapSequential

Flux.range(0, 12)
    .groupBy(i -> i % 3)
    .flatMap(g -> g.flatMapSequential(i -> getFromService(g.key(), i)))

另一种选择是将参数设置为 使用,但我建议使用上述方法之一。.flatMapconcurrency1



查看完整回答
反对 回复 2022-09-28
  • 1 回答
  • 0 关注
  • 83 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信