为了账号安全,请及时绑定邮箱和手机立即绑定

有没有办法部分控制 Java 并行流的顺序?

有没有办法部分控制 Java 并行流的顺序?

三国纷争 2022-10-26 16:43:01
我知道尝试使并行流以特定顺序执行每个元素是没有意义的。由于它并行运行数据,因此排序中显然会有一些不确定性。但是,我想知道是否有可能让它按顺序执行“排序”,或者至少尝试保持顺序有点类似于如果它是顺序的。用例我需要对来自几个数组的值的每个组合执行一些代码。我创建了一个包含所有可能索引组合的流,如下所示(为了不泄露专有信息,变量的名称已被混淆,我保证我通常不会命名我的变量arr1等arr2):public static void doMyComputation(double[] arr1, double[] arr2, double[] arr3) {  DoubleStream.of(arr1).mapToObj(Double::valueOf)    .flatMap(      i1->DoubleStream.of(arr2).mapToObj(Double::valueOf)        .flatMap(          i2->DoubleStream.of(arr3).mapToObj(Double::valueOf)            .flatMap(              i3->new Inputs(i1,i2,i3)             )        )    )    .parallel()    .forEach(input -> doComputationallyIntensiveThing(input.i1, input.i2, input.i3);这很好用(或者至少真实版本可以,我为我在此处发布的代码片段简化了一些事情,所以我可能把代码片段弄乱了)。我希望由于并行性,我不会看到 order 中的值arr1[0], arr2[0], arr3[0],其次是arr1[0], arr2[0], arr3[1]等等。但是,我希望我至少会看到从arr1第一个开始的前几个值的输入,然后慢慢工作我走到尽头的路arr1。我惊讶地发现它甚至没有接近那个。问题在于,在该doComputationallyIntensiveThing方法中,只有当我们同时看到许多相同的值时,才会有一些缓存表现良好arr1。如果这些值是完全随机输入的,那么缓存弊大于利。有什么方法可以提示流以将输入按中的值组合在一起的顺序执行输入arr1?如果没有,那么我可能只为每个值创建一个新流,arr1它会正常工作,但我想看看是否有一种方法可以在一个流中完成这一切。
查看完整描述

2 回答

?
智慧大石

TA贡献1946条经验 获得超3个赞

通常,您不应该假设并行流的特定处理顺序,而是假设您的算法是正确的,无论实际处理顺序如何,您都可以推断顺序和性能之间的关系。

Stream 实现已经被设计为允许从处理连续元素中受益——对于本地处理器。因此,当您有一个包含一百个元素的 Stream 时,例如IntStream.range(0, 100)为了简化,并使用四个原本空闲的 CPU 内核对其进行处理,实现会将其分为四个范围 0-25、25-50、50-75 和 75-100,最好是独立处理。因此,每个处理器将在本地处理连续元素并受益于低级效果,例如一次将多个数组元素提取到其本地缓存中,等等。

因此,您的doComputationallyIntensiveThing方法的问题似乎是缓存(和您的监控)在本地无法正常工作。因此,继续上面的示例,操作将从同时并行执行02550和和。如果第一个评估的四个元素中的任何一个“获胜”并确定缓存的数据,则它将仅适用于接下来的四个值中的一个。如果线程的时间发生变化,比率会变得更糟。751265176

一种解决方案是更改doComputationallyIntensiveThing为使用线程本地缓存,以从每个线程中连续元素的处理中受益。然后,您定义 Stream 操作的方式非常适合此操作,该操作受益于重复查看arr1. 不过,您可以简化代码并消除大量装箱开销:

Arrays.stream(arr1).parallel().forEach(i1 ->
    Arrays.stream(arr2).forEach(i2 ->
        Arrays.stream(arr3).forEach(i3 ->
            doComputationallyIntensiveThing(i1, i2, i3))));

但是,这带来了之后清理线程本地缓存的挑战,因为并行 Stream 使用了您无法控制的线程池。

一种更简单的解决方法,即今天有效的方法,是更改嵌套:

Arrays.stream(arr2).parallel().forEach(i2 ->
    Arrays.stream(arr1).forEach(i1 ->
        Arrays.stream(arr3).forEach(i3 ->
            doComputationallyIntensiveThing(i1, i2, i3))));

现在,arr2按照上述方式进行拆分。然后,每个工作线程将对 进行相同的迭代arr1,处理其中的每个元素的次数与 中的元素一样多arr3。这允许利用线程间缓存行为,但存在由于时间差异导致线程不同步的风险,最终会出现与以前相同的情况。

一个更好的选择是重新设计doComputationallyIntensiveThing,创建两种不同的方法,一种为返回包含元素缓存数据的对象的特定元素准备操作arr1,另一种用于使用缓存数据的实际处理:

Arrays.stream(arr1).parallel()
    .mapToObj(i1 -> prepareOperation(i1))
    .forEach(cached ->
        Arrays.stream(arr2).forEach(i2 ->
            Arrays.stream(arr3).forEach(i3 ->
                doComputationallyIntensiveThing(cached, i2, i3))));

在这里,返回的每个实例prepareOperation都与 的特定元素相关联,arr1并充当与其相关联的任何数据的本地缓存,但在特定元素的处理结束时会正常进行垃圾收集。所以不需要清理。

原则上,如果只返回一个空的持有者对象,它也可以工作,由特定元素prepareOperation的第一次调用填充。doComputationallyIntensiveThing


查看完整回答
反对 回复 2022-10-26
?
偶然的你

TA贡献1841条经验 获得超3个赞

为了使代码简单,下面的代码是针对一个数组的(您可以扩展它以包含更多数组)。


    class IteratorSpliteratorOfDouble implements Spliterator.OfDouble {


        private long m_estimate;

        private final DoubleSupplier m_supplier;


        /**

         * @param supplier -- returns Double.NaN if no more elements

         */

        private IteratorSpliteratorOfDouble(final long size,

                final DoubleSupplier supplier) {

            m_estimate = size;

            m_supplier = supplier;

        }


        public IteratorSpliteratorOfDouble(final double[] array) {

            this(array.length, new DoubleSupplier() {


                int m_idx = 0;


                @Override

                public synchronized double getAsDouble() {

                    if (m_idx >= array.length) {

                        return Double.NaN;

                    }


                    return array[m_idx++];

                }

            });

        }


        @Override

        public long estimateSize() {

            return m_estimate;

        }


        @Override

        public int characteristics() {

            return 0;

        }


        @Override

        public boolean tryAdvance(final DoubleConsumer action) {               


            final double next = m_supplier.getAsDouble();

            if (Double.isNaN(next)) {

                return false;

            }


            action.accept(next);

            return true;

        }


        @Override

        public Spliterator.OfDouble trySplit() {


            if (m_estimate == 0) {

                return null;

            }

            return new IteratorSpliteratorOfDouble(

                    m_estimate = m_estimate >>> 1, m_supplier);

        }

    }

使用上述的一个例子是:


    final double[] arr = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 };

    StreamSupport.doubleStream(new IteratorSpliteratorOfDouble(arr), true)

            .forEach(idx -> doComputationallyIntensiveThing(idx));

该代码将保持数组的元素顺序,同时利用 java 并行。


查看完整回答
反对 回复 2022-10-26
  • 2 回答
  • 0 关注
  • 83 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号