为了账号安全,请及时绑定邮箱和手机立即绑定

当其中之一抛出异常时如何阻止可运行对象的执行

当其中之一抛出异常时如何阻止可运行对象的执行

千万里不及你 2023-07-19 17:05:51
我有一组元素,对于每个元素,我都执行方法,将其作为 Runnable 传递给 CompletableFuture.runAsync() 。在执行过程中,可能需要停止整个计算,因此我在执行方法之前检查一些条件。如果计算应该停止,那么我会抛出一个异常,该异常在 CompletableFuture 之外处理。我想阻止所有 Runnables 的执行,这些 Runnables 在抛出异常后执行。因此,换句话说,当其中任何一个 CompletableFuture 抛出异常时,我不想等待所有 CompletableFuture 完成。Set elements = ...Executor executor = Executors.newFixedThreadPool(N);try {    CompletableFuture.allOf(elements.stream().map(e - > CompletableFuture.runAsync(() - > {        if (shouldStop()) {            throw new MyException();        }        myMethod(e);    }, executor)).toArray(CompletableFuture[]::new)).join()} catch (CompletionException e) {    ...}
查看完整描述

3 回答

?
慕田峪9158850

TA贡献1794条经验 获得超7个赞

发生异常时全部取消即可。障碍在于您在创建它们时并不了解所有这些,并且您不想多次执行此工作。这可以通过创建一个新的、空的CompletableFuture第一个(我们称之为f1)来解决。然后,像以前一样创建 future,但f1.cancel在if(shouldStop()) { … }语句中插入对 的调用。然后,在创建所有 future 后,将一个操作链接起来,将所有 future 取消f1。


取消将达到两个目的,它将阻止尚未开始的可运行对象的执行,并且将使未来通过不allOf等待仍在进行的评估完成来返回。


由于取消 aCompletableFuture与使用 a 异常完成它没有什么不同CancellationException,并且在出现多个异常的情况下,由 返回的 futureallOf将报告任意一个,我们可以使用自completeExceptionally定义来MyException代替,以确保报告的异常不会是次要的CancellationException。


一个独立的例子是:


static final AtomicInteger STOP = new AtomicInteger(2);

static boolean shouldStop() {

    return STOP.getAndDecrement() <= 0;

}

static final int N = 10;

public static void main(String[] args) {

    Set<Integer> elements = IntStream.range(0, 100).boxed().collect(Collectors.toSet());

    ExecutorService executor = Executors.newFixedThreadPool(N);

    try {

        CompletableFuture<?> cancelAll = new CompletableFuture<>();

        CompletableFuture<?>[] all = elements.stream()

            .map(e ->

                CompletableFuture.runAsync(() -> {

                    System.out.println("entered "+e);

                    if(shouldStop()) {

                        RuntimeException myException = new RuntimeException("stopped");

                         // alternatively cancelAll.cancel(false);

                        cancelAll.completeExceptionally(myException);

                        throw myException;

                    }

                    System.out.println("processing "+e);

                }, executor))

            .toArray(CompletableFuture<?>[]::new);

        cancelAll.whenComplete((value,throwable) -> {

            if(throwable != null) {

                for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable);

            }

        });

        CompletableFuture.allOf(all).join();

    } catch (CompletionException e) {

        e.printStackTrace();

    }

    executor.shutdown();

}

这会打印类似的东西


entered 3

entered 8

entered 4

entered 6

entered 1

entered 9

entered 0

entered 7

entered 5

entered 2

entered 10

processing 8

processing 3

java.util.concurrent.CompletionException: java.lang.RuntimeException: stopped

    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)

    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)

    at java.base/java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1423)

    at java.base/java.util.concurrent.CompletableFuture$CoCompletion.tryFire(CompletableFuture.java:1144)

    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)

    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)

    at CompletableFutureTest.lambda$main$3(CompletableFutureTest.java:34)

    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)

    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)

    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)

    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)

    at CompletableFutureTest.lambda$main$0(CompletableFutureTest.java:26)

    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)

    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

    at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.lang.RuntimeException: stopped

    at CompletableFutureTest.lambda$main$0(CompletableFutureTest.java:25)

    ... 4 more

显示由于并发性,一些可运行对象已经在运行,但一旦传播取消,就不会启动后续执行。


请注意,由于cancelAll只会在异常情况下完成或根本不会完成,cancelAll.whenComplete((value,throwable) -> { for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable); });因此您可以将链接操作简化为,但这只是编码风格是否保留冗余检查的问题。


您还可以向处理步骤添加延迟,以确保allOf(all).join()在满足停止条件时不会等待完成。


还可以将一个操作链接到返回的 future,runAsync该操作将在任何异常完成时取消所有操作,而不仅仅是显式停止。但是,必须注意返回表示通过 安排的操作的原始未来,runAsync而不是返回的未来whenComplete。


CompletableFuture<?> cancelAll = new CompletableFuture<>();

CompletableFuture<?>[] all = elements.stream()

    .map(e -> {

        CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {

            System.out.println("entered "+e);

            if(shouldStop()) throw new RuntimeException("stopped");

            System.out.println("processing "+e);

        }, executor);

        cf.whenComplete((value,throwable) -> {

            if(throwable != null) cancelAll.completeExceptionally(throwable);

        });

        return cf;

    })

    .toArray(CompletableFuture<?>[]::new);

cancelAll.whenComplete((value,throwable) -> {

    for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable);

});

CompletableFuture.allOf(all).join();


查看完整回答
反对 回复 2023-07-19
?
青春有我

TA贡献1784条经验 获得超8个赞

我对 s 没有太多(当然没有!)经验CompletableFuture,但我确实有一个建议(可能有帮助?)你可以在CompletableFuture.allOf(elements.stream().maptry 块外部声明 lambda 吗?这样,在尝试内部之前,所有期货都不会运行。但它们仍然可以被 catch 块访问。在其中您可以完成cancel所有这些。



查看完整回答
反对 回复 2023-07-19
?
至尊宝的传说

TA贡献1789条经验 获得超10个赞

您应该做的主要事情是interrupt希望更快地终止所有正在运行的任务,这意味着这些任务可能需要检查中断,以便它们知道停止正在做的事情并更快地终止。


此外,您可以在主线程中继续并让它们在后台终止,而不是等待被中断的任务实际终止。


public static void main(String[] args) {

    List<Integer> elements = Arrays.asList(5, null, 6, 3, 4); // these elements will fail fast

    // List<Integer> elements = Arrays.asList(5, 2, 6, 3, 4); // these elements will succeed


    try {

        CountDownLatch latch = new CountDownLatch(elements.size());

        ExecutorService executor = Executors.newFixedThreadPool(elements.size());

        elements.stream().forEach(e -> {

            executor.execute(() -> {

                try {

                    doSomething(e);

                    latch.countDown();

                } catch (Exception ex) {

                    // shutdown executor ASAP on exception, read the docs for `shutdownNow()`

                    // it will interrupt all tasks in the executor

                    if (!executor.isShutdown()) {

                        executor.shutdownNow();

                    }

                    for (int i = (int) latch.getCount(); i >= 0; i--) {

                        latch.countDown();

                    }

                    // log the exception

                    ex.printStackTrace(System.out);

                }

            });

        });

        latch.await();

        if (executor.isShutdown()) {

            System.out.println("Tasks failed! Terminating remaining tasks in the background.");

        } else {

            executor.shutdown();

            System.out.println("Tasks succeeded!");

        }

    } catch (InterruptedException e) {

        e.printStackTrace();

    }

}


public static void doSomething(Integer sleepSecs) {

    // You will want to check for `interrupted()` throughout the method you want to be able to cancel

    if (Thread.interrupted()) {

        System.out.println(Thread.currentThread().getName() + " interrupted early");

        return;

    }


    if (sleepSecs == null) {

        System.out.println(Thread.currentThread().getName() + " throwing exception ");

        throw new RuntimeException();

    }


    try {

        System.out.println(Thread.currentThread().getName() + " started interruptable sleep for " + sleepSecs + "s");

        Thread.sleep(sleepSecs * 1000);

        System.out.println(Thread.currentThread().getName() + " finished interruptable sleep" + sleepSecs + "s");

    } catch (InterruptedException e) {

        System.out.println(Thread.currentThread().getName() + " interrupted sleep!");

    }


    // ...possibly some part of the task that can't be skipped, such as cleanup


    System.out.println(Thread.currentThread().getName() + " complete!");

}


查看完整回答
反对 回复 2023-07-19
  • 3 回答
  • 0 关注
  • 129 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信