为了账号安全,请及时绑定邮箱和手机立即绑定

一旦找到任何匹配项,如何停止并行流?

一旦找到任何匹配项,如何停止并行流?

慕桂英3389331 2021-09-29 13:31:39
我正在尝试找到与给定谓词匹配的列表的第一个(任何)成员,如下所示:Item item = items.parallelStream()  .map(i -> i.doSomethingExpensive())  .filter(predicate)  .findAny()  .orElse(null);我希望一旦findAny()获得匹配,它会立即返回,但情况似乎并非如此。相反,它似乎在返回之前等待 map 方法在大多数元素上完成。如何立即返回第一个结果并取消其他并行流?有没有比使用诸如 的流更好的方法来做到这一点CompletableFuture?这是一个显示行为的简单示例:private static void log(String msg) {    private static void log(String msg) {    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");    System.out.println(sdf.format(new Date()) + " " + msg);}Random random = new Random();List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);Optional<Integer> num = nums.parallelStream()  .map(n -> {    long delay = Math.abs(random.nextLong()) % 10000;    log("Waiting on " + n + " for " + delay + " ms");    try { Thread.sleep(delay); }    catch (InterruptedException e) { System.err.println("Interruption error"); }    return n * n;  })  .filter(n -> n < 30)  .peek(n -> log("Found match: " + n))  .findAny();log("First match: " + num);日志输出:14:52:27.061 Waiting on 9 for 2271 ms14:52:27.061 Waiting on 2 for 1124 ms14:52:27.061 Waiting on 13 for 547 ms14:52:27.061 Waiting on 4 for 517 ms14:52:27.061 Waiting on 1 for 1210 ms14:52:27.061 Waiting on 6 for 2646 ms14:52:27.061 Waiting on 0 for 4393 ms14:52:27.061 Waiting on 12 for 5520 ms14:52:27.581 Found match: 1614:52:27.582 Waiting on 3 for 5365 ms14:52:28.188 Found match: 414:52:28.275 Found match: 114:52:31.457 Found match: 014:52:32.950 Found match: 914:52:32.951 First match: Optional[0]一旦找到匹配项(在本例中为 16),findAny()不会立即返回,而是阻塞直到其余线程完成。在这种情况下,调用者在找到匹配后返回之前要额外等待 5 秒。
查看完整描述

3 回答

?
慕娘9325324

TA贡献1783条经验 获得超4个赞

相反,它似乎在返回之前等待 map 方法在大多数元素上完成。


这是不正确的。


当谈到已经被处理的元素时,它将等待所有元素的完成,因为 Stream API 允许并发处理本质上不是线程安全的数据结构。在从终端操作返回之前,它必须确保所有潜在的并发访问都已完成。


在谈论整个流时,在 8 核机器上测试只有 14 个元素的流是不公平的。当然,至少会有 8 个并发操作开始,这就是全部。您正在通过使用findFirst()而不是为火焰添加燃料findAny(),因为这并不意味着按处理顺序返回第一个找到的元素,而是按遇到顺序返回第一个元素,即在您的示例中恰好为零,因此线程处理除第一个块之外的其他块不能假设他们的结果是正确答案,并且比使用 更愿意帮助处理其他候选人findAny()。


当你使用


List<Integer> nums = IntStream.range(0, 200).boxed().collect(Collectors.toList());

Optional<Integer> num = nums.parallelStream()

        .map(n -> {

            long delay = ThreadLocalRandom.current().nextInt(10_000);

            log("Waiting on " + n + " for " + delay + " ms");

            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));

            return n * n;

        })

        .filter(n -> n < 40_000)

        .peek(n -> log("Found match: " + n))

        .findAny();


log("First match: " + num);

尽管流元素的数量要多得多,但您将获得相似数量的任务运行完成。


请注意,CompletableFuture它也不支持中断,因此我想到的唯一用于返回任何结果和取消其他作业的内置功能是旧的ExecutorService.invokeAny.


要为其构建映射和过滤功能,我们可以使用以下辅助函数:


static <T,R> Callable<R> mapAndfilter(T t, Function<T,R> f, Predicate<? super R> p) {

    return () -> {

        R r = f.apply(t);

        if(!p.test(r)) throw new NoSuchElementException();

        return r;

    };

}

不幸的是,只有使用值或异常完成的选项,因此我们必须对不匹配的元素使用异常。


然后我们可以像这样使用它


ExecutorService es = ForkJoinPool.commonPool();

Integer result = es.invokeAny(IntStream.range(0, 100)

    .mapToObj(i -> mapAndfilter(i,

        n -> {

            long delay = ThreadLocalRandom.current().nextInt(10_000);

            log("Waiting on " + n + " for " + delay + " ms");

            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));

            return n * n;

        },

        n -> n < 10_000))

    .collect(Collectors.toList()));


log("result: "+result);

它不仅会取消挂起的任务,还会在不等待它们完成的情况下返回。


当然,这意味着操作作业的源数据必须是不可变的或线程安全的。


查看完整回答
反对 回复 2021-09-29
?
九州编程

TA贡献1785条经验 获得超4个赞

您可以使用此代码来说明 parallelStream 的工作原理:


final List<String> list = Arrays.asList("first", "second", "third", "4th", "5th", "7th", "8th", "9th", "10th", "11th", "12th", "13th");


    String result = list.parallelStream()

                        .map(s -> {

                            System.out.println("map: " + s);

                            return s;

                        })

                        .filter(s -> {

                            System.out.println("fiter: " + s);

                            return s.equals("8th");

                        })

                        .findFirst()

                        .orElse(null);


    System.out.println("result=" + result);

有两种选择可以实现您的目标,用过滤器停止昂贵的操作:


根本不要使用流,使用简单的 for 或增强的 for

先过滤,然后用昂贵的操作映射


查看完整回答
反对 回复 2021-09-29
  • 3 回答
  • 0 关注
  • 175 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信