为了账号安全,请及时绑定邮箱和手机立即绑定

当ForkJoinPool结束时

当ForkJoinPool结束时

慕侠2389804 2023-06-21 13:15:03
我试图弄清楚所有 ForkJoinPool 线程何时完成其任务。我编写了这个测试应用程序(我使用 System.out 因为它只是一个快速测试应用程序,并且没有错误检查/处理):public class TestForkJoinPoolEnd {    private static final Queue<String> queue = new LinkedList<>();    private static final int MAX_SIZE = 5000;    private static final int SPEED_UP = 100;    public static void main(String[] args) {        ForkJoinPool customThreadPool = new ForkJoinPool(12);        customThreadPool.submit(                () -> makeList()                        .parallelStream()                        .forEach(TestForkJoinPoolEnd::process));        enqueue("Theard pool started up");        int counter = MAX_SIZE + 1;        while (!customThreadPool.isTerminating()) {            String s = dequeue();            if (s != null) {                System.out.println(s);                counter--;            }            try {                TimeUnit.MILLISECONDS.sleep(1);            } catch (InterruptedException e) {            }        }        System.out.println("counter = " + counter);        System.out.println("isQuiescent = " + customThreadPool.isQuiescent()     + " isTerminating " +                "= " + customThreadPool.isTerminating() + " isTerminated = "                + customThreadPool.isTerminated() + " isShutdown =" +     customThreadPool.isShutdown());    }    static List<String> makeList() {        return Stream.generate(() -> makeString())                .limit(MAX_SIZE)                .collect(Collectors.toList());    }    static String makeString() {        int leftLimit = 97; // letter 'a'        int rightLimit = 122; // letter 'z'        int targetStringLength = 10;        Random random = new Random();    }}这段代码被卡住并且永远无法完成。如果我将 while 循环的条件更改为!customThreadPool.isQuiescent()它会终止循环,计数器和队列大小设置为 1。我应该使用什么来确定线程何时完成?
查看完整描述

1 回答

?
LEATH

TA贡献1936条经验 获得超6个赞

AnExecutorService不会仅仅因为一项作业(及其子作业)完成而自行终止。线程池背后的整个想法是可重用。

因此,只有当应用程序调用shutdown()它时,它才会终止。

您可以使用它isQuiescent()来查找是否没有待处理的作业,这仅在所有提交的作业都属于您的特定任务时才有效。使用返回的 future 来submit检查实际工作的完成情况要简洁得多。

在任何一种情况下,排队任务的完成状态都不会说明您正在轮询的队列。当您了解提交结束时,您仍然需要检查队列中是否有未决元素。

此外,建议使用线程安全BlockingQueue实现而不是装饰LinkedListwithsynchronized块。加上一些需要清理的其他内容,代码将如下所示:

public class TestForkJoinPoolEnd {

    private static final BlockingQueue<String> QUEUE = new LinkedBlockingQueue<>();

    private static final int MAX_SIZE = 5000;

    private static final int SPEED_UP = 100;


    public static void main(String[] args) {

        ForkJoinPool customThreadPool = new ForkJoinPool(12);

        ForkJoinTask<?> future = customThreadPool.submit(

            () -> makeList()

                    .parallelStream()

                    .forEach(TestForkJoinPoolEnd::process));

        QUEUE.offer("Theard pool started up");


        int counter = MAX_SIZE + 1;

        while (!future.isDone()) try {

            String s = QUEUE.poll(1, TimeUnit.MILLISECONDS);

            if (s != null) {

                System.out.println(s);

                counter--;

            }

        } catch (InterruptedException e) {}


        for(;;) {

            String s = QUEUE.poll();

            if (s == null) break;

            System.out.println(s);

            counter--;

        }

        System.out.println("counter = " + counter);

        System.out.println("isQuiescent = " + customThreadPool.isQuiescent()     + " isTerminating " +

                "= " + customThreadPool.isTerminating() + " isTerminated = "

                + customThreadPool.isTerminated() + " isShutdown =" +     customThreadPool.isShutdown());


        customThreadPool.shutdown();

    }


    static List<String> makeList() {

        return IntStream.range(0, MAX_SIZE)

            .mapToObj(i -> makeString())

            .collect(Collectors.toList());

    }


    static String makeString() {

        int targetStringLength = 10;

        Random random = new Random();

        StringBuilder buffer = new StringBuilder(targetStringLength);

        for (int i = 0; i < targetStringLength; i++) {

            int randomLimitedInt = random.nextInt('z' - 'a' + 1) + 'a';

            buffer.append((char) randomLimitedInt);

        }

        return buffer.toString();

    }


    static int toSeed(String s) {

        return s.chars().sum() / SPEED_UP;

    }


    static void process(String s) {

        long start = System.nanoTime();

        try {

            TimeUnit.MILLISECONDS.sleep(toSeed(s));

        } catch (InterruptedException e) {


        }

        long end = System.nanoTime();

        QUEUE.offer(s + " slept for " + (end - start)/1000000 + " milliseconds");

    }

}

如果您sleep在接收端的呼叫应该模拟一些工作量而不是等待新项目,您也可以使用


int counter = MAX_SIZE + 1;

while (!future.isDone()) {

    String s = QUEUE.poll();

    if (s != null) {

        System.out.println(s);

        counter--;

    }

    try {

        TimeUnit.MILLISECONDS.sleep(1);

    } catch (InterruptedException e) {}

}

但逻辑没有改变。future.isDone()返回后true,我们必须重新检查队列中待处理的元素。我们只保证不会有新的项目到达,而不保证队列已经空了。


作为旁注,该makeString()方法可以进一步改进


static String makeString() {

    int targetStringLength = 10;

    ThreadLocalRandom random = ThreadLocalRandom.current();

    StringBuilder buffer = new StringBuilder(targetStringLength);

    for (int i = 0; i < targetStringLength; i++) {

        int randomLimitedInt = random.nextInt('a', 'z' + 1);

        buffer.append((char)randomLimitedInt);

    }

    return buffer.toString();

}

甚至


static String makeString() {

    int targetStringLength = 10;

    return ThreadLocalRandom.current()

        .ints(targetStringLength, 'a', 'z'+1)

        .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)

        .toString();

}


查看完整回答
反对 回复 2023-06-21
  • 1 回答
  • 0 关注
  • 109 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信