为了账号安全,请及时绑定邮箱和手机立即绑定

Java ForkJoinPool - 队列中的任务顺序

Java ForkJoinPool - 队列中的任务顺序

呼啦一阵风 2021-09-29 16:32:05
我想了解在 Java fork-join 池中处理任务的顺序。到目前为止,我在文档中找到的唯一相关信息是关于名为“asyncMode”的参数,“如果此池对从未加入的分叉任务使用本地先进先出调度模式,则为真” .我对这句话的理解是每个worker都有自己的任务队列;工作人员从自己队列的前面获取任务,如果自己的队列为空,则从其他工作人员的队列后面窃取任务;如果 asyncMode 为真(相应的假),工作人员会将新分叉的任务添加到他们自己队列的后面(相应的前面)。如果我的解释有误,请纠正我!现在,这提出了几个问题:1)加入的分叉任务的顺序是什么?我的猜测是,当一个任务被分叉时,它会被添加到工作人员的队列中,如我上面的解释中所述。现在,假设任务已加入...如果在调用 join 时任务尚未启动,则调用 join 的 worker 会将任务从队列中拉出并立即开始处理它。如果在调用 join 时该任务已被另一个 worker 窃取,则调用 join 的 worker 将同时处理其他任务(按照我上面解释中描述的获取任务的顺序),直到它被加入已经被偷走它的工人完成了。这个猜测是基于用打印语句编写简单的测试代码,并观察改变连接调用顺序影响任务处理顺序的方式。有人可以告诉我我的猜测是否正确?2) 外部提交的任务排序是什么?根据这个问题的答案,fork-join 池不使用外部队列。(顺便说一下,我正在使用 Java 8。)那么我是否理解在外部提交任务时,该任务会被添加到随机选择的工作队列中?如果是,外部提交的任务是加在队列的后面还是前面?最后,这取决于任务是通过调用 pool.execute(task) 还是通过调用 pool.invoke(task) 提交?这是否取决于调用 pool.execute(task) 或 pool.invoke(task) 的线程是外部线程还是此 fork-join 池中的线程?
查看完整描述

1 回答

?
皈依舞

TA贡献1851条经验 获得超3个赞

你的猜测是正确的,你完全正确。正如您在“实施概述”中所读到的那样。

 * Joining Tasks

 * =============

 *

 * Any of several actions may be taken when one worker is waiting

 * to join a task stolen (or always held) by another.  Because we

 * are multiplexing many tasks on to a pool of workers, we can't

 * just let them block (as in Thread.join).  We also cannot just

 * reassign the joiner's run-time stack with another and replace

 * it later, which would be a form of "continuation", that even if

 * possible is not necessarily a good idea since we may need both

 * an unblocked task and its continuation to progress.  Instead we

 * combine two tactics:

 *

 *   Helping: Arranging for the joiner to execute some task that it

 *      would be running if the steal had not occurred.

 *

 *   Compensating: Unless there are already enough live threads,

 *      method tryCompensate() may create or re-activate a spare

 *      thread to compensate for blocked joiners until they unblock.

2.ForkJoinPool.invoke和ForkJoinPool.join在提交任务的方式上是完全一样的。你可以在代码中看到


    public <T> T invoke(ForkJoinTask<T> task) {

        if (task == null)

            throw new NullPointerException();

        externalPush(task);

        return task.join();

    }

    public void execute(ForkJoinTask<?> task) {

        if (task == null)

            throw new NullPointerException();

        externalPush(task);

    }

在 externalPush 中,您可以看到使用 ThreadLocalRandom 将任务添加到随机选择的工作队列中。此外,它使用推送方法进入队列的头部。


    final void externalPush(ForkJoinTask<?> task) {

        WorkQueue[] ws; WorkQueue q; int m;

        int r = ThreadLocalRandom.getProbe();

        int rs = runState;

        if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&

            (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&

            U.compareAndSwapInt(q, QLOCK, 0, 1)) {

            ForkJoinTask<?>[] a; int am, n, s;

            if ((a = q.array) != null &&

                (am = a.length - 1) > (n = (s = q.top) - q.base)) {

                    int j = ((am & s) << ASHIFT) + ABASE;

                U.putOrderedObject(a, j, task);

                U.putOrderedInt(q, QTOP, s + 1);

                U.putIntVolatile(q, QLOCK, 0);

                if (n <= 1)

                    signalWork(ws, q);

                return;

            }

            U.compareAndSwapInt(q, QLOCK, 1, 0);

        }

        externalSubmit(task);

    }

我不确定你的意思是什么:


这是否取决于调用 pool.execute(task) 或 pool.invoke(task) 的线程是外部线程还是此 fork-join 池中的线程?


查看完整回答
反对 回复 2021-09-29
  • 1 回答
  • 0 关注
  • 285 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信