2 回答

TA贡献1946条经验 获得超3个赞
通常,您不应该假设并行流的特定处理顺序,而是假设您的算法是正确的,无论实际处理顺序如何,您都可以推断顺序和性能之间的关系。
Stream 实现已经被设计为允许从处理连续元素中受益——对于本地处理器。因此,当您有一个包含一百个元素的 Stream 时,例如IntStream.range(0, 100)
为了简化,并使用四个原本空闲的 CPU 内核对其进行处理,实现会将其分为四个范围 0-25、25-50、50-75 和 75-100,最好是独立处理。因此,每个处理器将在本地处理连续元素并受益于低级效果,例如一次将多个数组元素提取到其本地缓存中,等等。
因此,您的doComputationallyIntensiveThing
方法的问题似乎是缓存(和您的监控)在本地无法正常工作。因此,继续上面的示例,操作将从同时并行执行0
、25
、50
和和。如果第一个评估的四个元素中的任何一个“获胜”并确定缓存的数据,则它将仅适用于接下来的四个值中的一个。如果线程的时间发生变化,比率会变得更糟。75
1
26
51
76
一种解决方案是更改doComputationallyIntensiveThing
为使用线程本地缓存,以从每个线程中连续元素的处理中受益。然后,您定义 Stream 操作的方式非常适合此操作,该操作受益于重复查看arr1
. 不过,您可以简化代码并消除大量装箱开销:
Arrays.stream(arr1).parallel().forEach(i1 -> Arrays.stream(arr2).forEach(i2 -> Arrays.stream(arr3).forEach(i3 -> doComputationallyIntensiveThing(i1, i2, i3))));
但是,这带来了之后清理线程本地缓存的挑战,因为并行 Stream 使用了您无法控制的线程池。
一种更简单的解决方法,即今天有效的方法,是更改嵌套:
Arrays.stream(arr2).parallel().forEach(i2 -> Arrays.stream(arr1).forEach(i1 -> Arrays.stream(arr3).forEach(i3 -> doComputationallyIntensiveThing(i1, i2, i3))));
现在,arr2
按照上述方式进行拆分。然后,每个工作线程将对 进行相同的迭代arr1
,处理其中的每个元素的次数与 中的元素一样多arr3
。这允许利用线程间缓存行为,但存在由于时间差异导致线程不同步的风险,最终会出现与以前相同的情况。
一个更好的选择是重新设计doComputationallyIntensiveThing
,创建两种不同的方法,一种为返回包含元素缓存数据的对象的特定元素准备操作arr1
,另一种用于使用缓存数据的实际处理:
Arrays.stream(arr1).parallel() .mapToObj(i1 -> prepareOperation(i1)) .forEach(cached -> Arrays.stream(arr2).forEach(i2 -> Arrays.stream(arr3).forEach(i3 -> doComputationallyIntensiveThing(cached, i2, i3))));
在这里,返回的每个实例prepareOperation
都与 的特定元素相关联,arr1
并充当与其相关联的任何数据的本地缓存,但在特定元素的处理结束时会正常进行垃圾收集。所以不需要清理。
原则上,如果只返回一个空的持有者对象,它也可以工作,由特定元素prepareOperation
的第一次调用填充。doComputationallyIntensiveThing

TA贡献1841条经验 获得超3个赞
为了使代码简单,下面的代码是针对一个数组的(您可以扩展它以包含更多数组)。
class IteratorSpliteratorOfDouble implements Spliterator.OfDouble {
private long m_estimate;
private final DoubleSupplier m_supplier;
/**
* @param supplier -- returns Double.NaN if no more elements
*/
private IteratorSpliteratorOfDouble(final long size,
final DoubleSupplier supplier) {
m_estimate = size;
m_supplier = supplier;
}
public IteratorSpliteratorOfDouble(final double[] array) {
this(array.length, new DoubleSupplier() {
int m_idx = 0;
@Override
public synchronized double getAsDouble() {
if (m_idx >= array.length) {
return Double.NaN;
}
return array[m_idx++];
}
});
}
@Override
public long estimateSize() {
return m_estimate;
}
@Override
public int characteristics() {
return 0;
}
@Override
public boolean tryAdvance(final DoubleConsumer action) {
final double next = m_supplier.getAsDouble();
if (Double.isNaN(next)) {
return false;
}
action.accept(next);
return true;
}
@Override
public Spliterator.OfDouble trySplit() {
if (m_estimate == 0) {
return null;
}
return new IteratorSpliteratorOfDouble(
m_estimate = m_estimate >>> 1, m_supplier);
}
}
使用上述的一个例子是:
final double[] arr = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 };
StreamSupport.doubleStream(new IteratorSpliteratorOfDouble(arr), true)
.forEach(idx -> doComputationallyIntensiveThing(idx));
该代码将保持数组的元素顺序,同时利用 java 并行。
添加回答
举报