我从未使用过 ForkJoinPool,我遇到了这个代码片段。我有一个Set<Document> docs. 文档有写方法。如果我执行以下操作,是否需要使用 get 或 join 来确保集合中的所有文档都正确完成了它们的写入方法?ForkJoinPool pool = new ForkJoinPool(concurrencyLevel);pool.submit(() -> docs.parallelStream().forEach( doc -> { doc.write(); }));如果其中一个文档无法完成它的写入会发生什么?说它抛出一个异常。给出的代码是否等待所有文档完成其写入操作?
1 回答
呼啦一阵风
TA贡献1802条经验 获得超6个赞
ForkJoinPool.submit(Runnable)
返回一个ForkJoinTask
代表待完成的任务。如果您想等待所有文档被处理,您需要与该任务进行某种形式的同步,例如调用其 get()
方法(从Future
接口)。
关于异常处理,像往常一样,流处理期间的任何异常都会停止它。但是,您必须参考以下文档Stream.forEach(Consumer)
:
此操作的行为明显是不确定的。对于并行流管道,此操作不保证遵守流的遇到顺序,因为这样做会牺牲并行性的好处。对于任何给定的元素,可以在库选择的任何时间和线程中执行该操作。[…]
这意味着如果发生异常,您无法保证将写入哪个文档。处理将停止,但您无法控制仍将处理哪个文档。
如果您想确保处理剩余的文件,我会建议两种解决方案:
document.write()
用try
/包围catch
以确保没有异常传播,但这使得很难检查哪个文档成功或根本没有失败;要么使用另一种解决方案来管理您的并行处理,例如
CompletableFuture
API。正如评论中所指出的,您当前的解决方案是一种由于实现细节而起作用的黑客,因此最好做一些更清洁的事情。
使用CompletableFuture
,你可以这样做:
List<CompletableFuture<Void>> futures = docs.stream() .map(doc -> CompletableFuture.runAsync(doc::write, pool)) .collect(Collectors.toList());
这将确保处理所有文档,并检查返回列表中的每个未来是否成功。
添加回答
举报
0/150
提交
取消