为了账号安全,请及时绑定邮箱和手机立即绑定

Java 8 一个流到多个映射

Java 8 一个流到多个映射

波斯汪 2021-07-07 18:30:47
假设我有不适合内存的巨大网络服务器日志文件。我需要将此文件流式传输到 mapreduce 方法并保存到数据库。我使用 Java 8 流 api 执行此操作。例如,我在 mapreduce 过程之后得到一个列表,例如,按客户端消耗,按 ip 消耗,按内容消耗。但是,我的需求不像我的例子中给出的那样。由于我无法共享代码,我只想给出基本示例。通过 Java 8 Stream Api,我想只读取一次文件,同时获取 3 个列表,而我正在流式传输文件,并行或顺序。但平行会很好。有没有办法做到这一点?
查看完整描述

2 回答

?
跃然一笑

TA贡献1826条经验 获得超6个赞

我已经根据您的情况调整了这个问题的答案。自定义 Spliterator 会将流“拆分”为多个由不同属性收集的流:


@SafeVarargs

public static <T> long streamForked(Stream<T> source, Consumer<Stream<T>>... consumers)

{

    return StreamSupport.stream(new ForkingSpliterator<>(source, consumers), false).count();

}


public static class ForkingSpliterator<T>

    extends AbstractSpliterator<T>

{

    private Spliterator<T>         sourceSpliterator;


    private List<BlockingQueue<T>> queues = new ArrayList<>();


    private boolean                sourceDone;


    @SafeVarargs

    private ForkingSpliterator(Stream<T> source, Consumer<Stream<T>>... consumers)

    {

        super(Long.MAX_VALUE, 0);


        sourceSpliterator = source.spliterator();


        for (Consumer<Stream<T>> fork : consumers)

        {

            LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();

            queues.add(queue);

            new Thread(() -> fork.accept(StreamSupport.stream(new ForkedConsumer(queue), false))).start();

        }

    }


    @Override

    public boolean tryAdvance(Consumer<? super T> action)

    {

        sourceDone = !sourceSpliterator.tryAdvance(t -> queues.forEach(queue -> queue.offer(t)));

        return !sourceDone;

    }


    private class ForkedConsumer

        extends AbstractSpliterator<T>

    {

        private BlockingQueue<T> queue;


        private ForkedConsumer(BlockingQueue<T> queue)

        {

            super(Long.MAX_VALUE, 0);

            this.queue = queue;

        }


        @Override

        public boolean tryAdvance(Consumer<? super T> action)

        {

            while (queue.peek() == null)

            {

                if (sourceDone)

                {

                    // element is null, and there won't be no more, so "terminate" this sub stream

                    return false;

                }

            }


            // push to consumer pipeline

            action.accept(queue.poll());


            return true;

        }

    }

}

您可以按如下方式使用它:


streamForked(Stream.of(new Row("content1", "client1", "location1", 1),

                       new Row("content2", "client1", "location1", 2),

                       new Row("content1", "client1", "location2", 3),

                       new Row("content2", "client2", "location2", 4),

                       new Row("content1", "client2", "location2", 5)),

             rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient,

                                                                           Collectors.groupingBy(Row::getContent,

                                                                                                 Collectors.summingInt(Row::getConsumption))))),

             rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient,

                                                                           Collectors.groupingBy(Row::getLocation,

                                                                                                 Collectors.summingInt(Row::getConsumption))))),

             rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getContent,

                                                                           Collectors.groupingBy(Row::getLocation,

                                                                                                 Collectors.summingInt(Row::getConsumption))))));


// Output

// {client2={location2=9}, client1={location1=3, location2=3}}

// {client2={content2=4, content1=5}, client1={content2=2, content1=4}}

// {content2={location1=2, location2=4}, content1={location1=1, location2=8}}

请注意,您几乎可以对流的副本做任何想做的事情。根据您的示例,我使用堆叠groupingBy收集器按两个属性对行进行分组,然后总结 int 属性。所以结果将是一个Map<String, Map<String, Integer>>. 但您也可以将其用于其他场景:


rows -> System.out.println(rows.count())

rows -> rows.forEach(row -> System.out.println(row))

rows -> System.out.println(rows.anyMatch(row -> row.getConsumption() > 3))


查看完整回答
反对 回复 2021-07-14
?
慕桂英546537

TA贡献1848条经验 获得超10个赞

通常收集到标准 API 以外的任何东西都可以通过自定义Collector. 在您的情况下,一次收集 3 个列表(只是一个编译的小例子,因为您也无法共享您的代码):


private static <T> Collector<T, ?, List<List<T>>> to3Lists() {

    class Acc {


        List<T> left = new ArrayList<>();


        List<T> middle = new ArrayList<>();


        List<T> right = new ArrayList<>();


        List<List<T>> list = Arrays.asList(left, middle, right);


        void add(T elem) {

            // obviously do whatever you want here

            left.add(elem);

            middle.add(elem);

            right.add(elem);

        }


        Acc merge(Acc other) {


            left.addAll(other.left);

            middle.addAll(other.middle);

            right.addAll(other.right);


            return this;

        }


        public List<List<T>> finisher() {

            return list;

        }


    }

    return Collector.of(Acc::new, Acc::add, Acc::merge, Acc::finisher);

}

并通过以下方式使用它:


Stream.of(1, 2, 3)

      .collect(to3Lists());

显然,这个自定义收集器没有做任何有用的事情,而只是一个如何使用它的示例。


查看完整回答
反对 回复 2021-07-14
  • 2 回答
  • 0 关注
  • 163 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信