3 回答
TA贡献1783条经验 获得超4个赞
相反,它似乎在返回之前等待 map 方法在大多数元素上完成。
这是不正确的。
当谈到已经被处理的元素时,它将等待所有元素的完成,因为 Stream API 允许并发处理本质上不是线程安全的数据结构。在从终端操作返回之前,它必须确保所有潜在的并发访问都已完成。
在谈论整个流时,在 8 核机器上测试只有 14 个元素的流是不公平的。当然,至少会有 8 个并发操作开始,这就是全部。您正在通过使用findFirst()而不是为火焰添加燃料findAny(),因为这并不意味着按处理顺序返回第一个找到的元素,而是按遇到顺序返回第一个元素,即在您的示例中恰好为零,因此线程处理除第一个块之外的其他块不能假设他们的结果是正确答案,并且比使用 更愿意帮助处理其他候选人findAny()。
当你使用
List<Integer> nums = IntStream.range(0, 200).boxed().collect(Collectors.toList());
Optional<Integer> num = nums.parallelStream()
.map(n -> {
long delay = ThreadLocalRandom.current().nextInt(10_000);
log("Waiting on " + n + " for " + delay + " ms");
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
return n * n;
})
.filter(n -> n < 40_000)
.peek(n -> log("Found match: " + n))
.findAny();
log("First match: " + num);
尽管流元素的数量要多得多,但您将获得相似数量的任务运行完成。
请注意,CompletableFuture它也不支持中断,因此我想到的唯一用于返回任何结果和取消其他作业的内置功能是旧的ExecutorService.invokeAny.
要为其构建映射和过滤功能,我们可以使用以下辅助函数:
static <T,R> Callable<R> mapAndfilter(T t, Function<T,R> f, Predicate<? super R> p) {
return () -> {
R r = f.apply(t);
if(!p.test(r)) throw new NoSuchElementException();
return r;
};
}
不幸的是,只有使用值或异常完成的选项,因此我们必须对不匹配的元素使用异常。
然后我们可以像这样使用它
ExecutorService es = ForkJoinPool.commonPool();
Integer result = es.invokeAny(IntStream.range(0, 100)
.mapToObj(i -> mapAndfilter(i,
n -> {
long delay = ThreadLocalRandom.current().nextInt(10_000);
log("Waiting on " + n + " for " + delay + " ms");
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
return n * n;
},
n -> n < 10_000))
.collect(Collectors.toList()));
log("result: "+result);
它不仅会取消挂起的任务,还会在不等待它们完成的情况下返回。
当然,这意味着操作作业的源数据必须是不可变的或线程安全的。
TA贡献1785条经验 获得超4个赞
您可以使用此代码来说明 parallelStream 的工作原理:
final List<String> list = Arrays.asList("first", "second", "third", "4th", "5th", "7th", "8th", "9th", "10th", "11th", "12th", "13th");
String result = list.parallelStream()
.map(s -> {
System.out.println("map: " + s);
return s;
})
.filter(s -> {
System.out.println("fiter: " + s);
return s.equals("8th");
})
.findFirst()
.orElse(null);
System.out.println("result=" + result);
有两种选择可以实现您的目标,用过滤器停止昂贵的操作:
根本不要使用流,使用简单的 for 或增强的 for
先过滤,然后用昂贵的操作映射
添加回答
举报