3 回答

TA贡献1801条经验 获得超8个赞
Collections.synchronizedList()您在使用时应该使用parallelStream(). 因为ArrayList它不是线程安全的,并且在并发访问它时会出现意外的行为,就像您使用parallelStream().
我已经修改了您的代码,现在它可以正常工作:
private static void reduce_parallelStream() {
List<String> vals = Arrays.asList("a", "b");
// Use Synchronized List when with parallelStream()
List<String> join = vals.parallelStream().reduce(Collections.synchronizedList(new ArrayList<>()),
(l, v) -> {
l.add(v);
return l;
}, (a, b) -> a // don't use addAll() here to multiplicate the output like [a, b, a, b]
);
System.out.println(join);
}
输出:
有时你会得到这样的输出:
[a, b]
有时还有这个:
[b, a]
这样做的原因是它是一个parallelStream()所以你无法确定执行的顺序。

TA贡献1876条经验 获得超6个赞
因为它是并行流,所以要减少的第一个参数new ArrayList() 可能会为每个输入值 a 和 b 调用两次。
这就是你错的地方。第一个参数是单个ArrayList实例,而不是lambda 表达式可以产生多个ArrayList实例。
因此,整个缩减操作在单个ArrayList实例上进行。当多个线程ArrayList并行修改该值时,每次执行的结果可能会发生变化。
您combiner实际上将 a 的所有元素添加List到同一个List.
[a,b]如果 和accumulator函数combiner都会生成新的ArrayList而不是改变其输入,则可以获得预期的输出ArrayList:
List<String> join = vals.parallelStream().reduce(
new ArrayList<String>(),
(List<String> l, String v) -> {
List<String> cl = new ArrayList<>(l);
cl.add(v);
return cl;
}, (a, b) -> {
List<String> ca = new ArrayList<>(a);
ca.addAll(b);
return ca;
}
);
也就是说,您reduce根本不应该使用。collect是执行可变归约的正确方法:
List<String> join = vals.parallelStream()
.collect(ArrayList::new,ArrayList::add,ArrayList::addAll);
正如您所看到的,这里与 in 不同reduce,您传递的第一个参数是 a Supplier<ArrayList<String>>,它可用于生成所需数量的中间ArrayList实例。

TA贡献1856条经验 获得超5个赞
这很简单,第一个参数是身份,或者我会说从零开始。因为parallelStream usage这个值被重用。这意味着并发问题(添加中的空值)和重复问题。
这可以通过以下方式修补:
final ArrayList<String> zero = new ArrayList<>();
List<String> join = vals.parallelStream().reduce(zero,
(List<String> l, String v) -> {
if (l == zero) {
l = new ArrayList<>();
}
l.add(v);
return l;
}, (a, b) -> {
// See comment of Holger:
if (a == zero) return b;
if (b == zero) return a;
a.addAll(b);
return a;
}
);
安全的。
您可能想知道为什么reduce身份提供函数没有重载。原因是collect这里应该使用它。
添加回答
举报