为了账号安全,请及时绑定邮箱和手机立即绑定

在一组文档上使用 ForkJoinPool

在一组文档上使用 ForkJoinPool

精慕HU 2021-08-04 10:15:28
我从未使用过 ForkJoinPool,我遇到了这个代码片段。我有一个Set<Document> docs. 文档有写方法。如果我执行以下操作,是否需要使用 get 或 join 来确保集合中的所有文档都正确完成了它们的写入方法?ForkJoinPool pool = new ForkJoinPool(concurrencyLevel);pool.submit(() -> docs.parallelStream().forEach(    doc -> {        doc.write();    }));如果其中一个文档无法完成它的写入会发生什么?说它抛出一个异常。给出的代码是否等待所有文档完成其写入操作?
查看完整描述

1 回答

?
呼啦一阵风

TA贡献1802条经验 获得超6个赞

ForkJoinPool.submit(Runnable)返回一个ForkJoinTask代表待完成的任务。如果您想等待所有文档被处理,您需要与该任务进行某种形式的同步,例如调用其 get()方法(从Future接口)。

关于异常处理,像往常一样,流处理期间的任何异常都会停止它。但是,您必须参考以下文档Stream.forEach(Consumer)

此操作的行为明显是不确定的。对于并行流管道,此操作不保证遵守流的遇到顺序,因为这样做会牺牲并行性的好处。对于任何给定的元素,可以在库选择的任何时间和线程中执行该操作。[…]

这意味着如果发生异常,您无法保证将写入哪个文档。处理将停止,但您无法控制仍将处理哪个文档。

如果您想确保处理剩余的文件,我会建议两种解决方案:

  • document.write()try/包围catch以确保没有异常传播,但这使得很难检查哪个文档成功或根本没有失败;要么

  • 使用另一种解决方案来管理您的并行处理,例如CompletableFutureAPI。正如评论中所指出的,您当前的解决方案是一种由于实现细节而起作用的黑客,因此最好做一些更清洁的事情。

使用CompletableFuture,你可以这样做:

List<CompletableFuture<Void>> futures = docs.stream()
                    .map(doc -> CompletableFuture.runAsync(doc::write, pool))
                    .collect(Collectors.toList());

这将确保处理所有文档,并检查返回列表中的每个未来是否成功。


查看完整回答
反对 回复 2021-08-04
  • 1 回答
  • 0 关注
  • 179 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信