为了账号安全,请及时绑定邮箱和手机立即绑定

从 CompletableFuture 调用 ExecutorService.shutdownNow

从 CompletableFuture 调用 ExecutorService.shutdownNow

狐的传说 2021-08-04 17:20:41
当已经运行的任务之一引发异常时,我需要取消所有计划但尚未运行的 CompletableFuture 任务。尝试了以下示例,但大多数情况下 main 方法不会退出(可能是由于某种类型的死锁)。public static void main(String[] args) {    ExecutorService executionService = Executors.newFixedThreadPool(5);    Set< CompletableFuture<?> > tasks = new HashSet<>();    for (int i = 0; i < 1000; i++) {        final int id = i;        CompletableFuture<?> c = CompletableFuture        .runAsync( () -> {            System.out.println("Running: " + id);             if ( id == 400 ) throw new RuntimeException("Exception from: " + id);        }, executionService )        .whenComplete( (v, ex) -> {             if ( ex != null ) {                System.out.println("Shutting down.");                executionService.shutdownNow();                System.out.println("shutdown.");            }        } );        tasks.add(c);    }    try{         CompletableFuture.allOf( tasks.stream().toArray(CompletableFuture[]::new) ).join();     }catch(Exception e) {         System.out.println("Got async exception: " + e);     }finally {         System.out.println("DONE");     }        }最后的打印输出是这样的:Running: 402Running: 400Running: 408Running: 407Running: 406Running: 405Running: 411Shutting down.Running: 410Running: 409Running: 413Running: 412shutdown.尝试shutdownNow在单独的线程上运行方法,但在大多数情况下,它仍然会出现相同的死锁。知道什么可能导致这种僵局吗?您认为在CompletableFuture抛出异常时取消所有已安排但尚未运行的s的最佳方法是什么?正在考虑迭代tasks并调用cancel每个CompletableFuture. 但我不喜欢这个的是CancellationException从join.
查看完整描述

2 回答

?
蓝山帝景

TA贡献1843条经验 获得超7个赞

另一个仅依赖的解决方案CompletableFuture是使用“取消者”未来,这将导致所有未完成的任务在完成时被取消:


Set<CompletableFuture<?>> tasks = ConcurrentHashMap.newKeySet();

CompletableFuture<Void> canceller = new CompletableFuture<>();


for(int i = 0; i < 1000; i++) {

    if (canceller.isDone()) {

        System.out.println("Canceller invoked, not creating other futures.");

        break;

    }

    //LockSupport.parkNanos(10);

    final int id = i;

    CompletableFuture<?> c = CompletableFuture

            .runAsync(() -> {

                //LockSupport.parkNanos(1000);

                System.out.println("Running: " + id);

                if(id == 400) throw new RuntimeException("Exception from: " + id);

            }, executionService);

    c.whenComplete((v, ex) -> {

        if(ex != null) {

            canceller.complete(null);

        }

    });

    tasks.add(c);

}

canceller.thenRun(() -> {

    System.out.println("Cancelling all tasks.");

    tasks.forEach(t -> t.cancel(false));

    System.out.println("Finished cancelling tasks.");

});


查看完整回答
反对 回复 2021-08-04
  • 2 回答
  • 0 关注
  • 194 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信