为了账号安全,请及时绑定邮箱和手机立即绑定

如何在Reactor中进行多线程文件处理

如何在Reactor中进行多线程文件处理

当年话下 2024-01-25 22:05:10
我正在尝试使用 Reactor 的 Flux 并行处理多个文件。主要工作负载发生在对 Flux 的调用中flatMap,然后对 Flux 进行转换和过滤。每当我尝试订阅生成的 Flux 时,主线程都会在收到任何值之前退出。Flux.fromStream(Files.list(Paths.get("directory"))    .flatMap(path -> {         return Flux.create(sink -> {            try (                RandomAccessFile file = new RandomAccessFile(new File(path), "r");                FileChannel fileChannel = file.getChannel()            ) {                // Process file into tokens                sink.next(new Token(".."));            } catch (IOException e) {                sink.error(e);            } finally {                sink.complete();            }        }).subscribeOn(Schedulers.boundedElastic());    })    .map(token -> /* Transform tokens */)    .filter(token -> /* Filter tokens*/)    .subscribe(token -> /* Store tokens in list */)我希望在列表中找到处理管道的输出,但程序立即退出。首先我想知道我是否正确使用 Flux 类,其次我如何等待订阅调用完成?
查看完整描述

1 回答

?
青春有我

TA贡献1784条经验 获得超8个赞

我希望在列表中找到处理管道的输出,但程序立即退出。


您那里的代码在主线程上设置反应链,然后...在主线程上不执行任何其他操作。因此,主线程完成了其工作,并且由于boundedElastic()线程是守护线程,因此没有其他线程阻止程序退出,因此它退出。


您可以通过一个更简单的示例看到相同的行为:


Flux<Integer> f = Flux.just(1, 2, 3, 4, 5)

            .delayElements(Duration.ofMillis(500));

f.subscribe(System.out::println);

您当然可以调用newBoundedElastic("name", false)它使其成为非守护程序支持的调度程序,但是您必须跟踪它并在完成后调用 dispose,所以它实际上只是反转了问题(程序无限运行,直到您处理掉调度程序。)


快速的“n”脏解决方案只是阻止最后一个元素作为Flux程序中的最后一行 - 所以如果我们添加:


f.blockLast();

...然后程序在退出之前等待最后一个元素被发出,我们就得到了我们想要的行为。


对于简单的概念证明来说,这很好。然而,它在“生产”代码中并不理想。首先,“无阻塞”是反应式代码中的一般规则,因此,如果您有这样的阻塞调用,则很难确定它是否是有意的。如果您添加了其他链并希望它们完成,则必须为每个链添加阻塞调用。这很混乱,而且不可持续。


更好的解决方案是使用CountDownLatch:


CountDownLatch cdl = new CountDownLatch(1);


Flux.just(1, 2, 3, 4, 5)

        .delayElements(Duration.ofMillis(500))

        .doFinally(s -> cdl.countDown())

        .subscribe(System.out::println);


cdl.await();

这样做的优点是不会显式阻塞,并且还能够同时处理多个发布者(如果将初始值设置为高于 1)。这也往往是我认为通常推荐用于此类事情的方法 -因此,如果您想要最广泛接受的解决方案,那么可能就是这样。


然而,我倾向于支持Phaser所有需要等待多个发布者的示例,而不是只等待一个 - 它的工作方式与 CountdownLatch 类似,但可以动态地register()运行deregister()。这意味着您可以创建单个移相器,然后根据需要轻松向其注册多个发布者,而无需更改初始值,例如:


Phaser phaser = new Phaser(1);


Flux.just(1, 2, 3, 4, 5)

        .doOnSubscribe(s -> phaser.register())

        .delayElements(Duration.ofMillis(500))

        .doFinally(s -> phaser.arriveAndDeregister())

        .subscribe(System.out::println);


Flux.just(1, 2, 3, 4, 5, 6, 7, 8)

        .doOnSubscribe(s -> phaser.register())

        .delayElements(Duration.ofMillis(500))

        .doFinally(s -> phaser.arriveAndDeregister())

        .subscribe(System.out::println);


phaser.arriveAndAwaitAdvance();

(当然,如果需要,您也可以将onSubscribe和doFinally逻辑包装在单独的方法中。)


查看完整回答
反对 回复 2024-01-25
  • 1 回答
  • 0 关注
  • 129 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信