为了账号安全,请及时绑定邮箱和手机立即绑定

hadoop 自定义可写不产生预期的输出

hadoop 自定义可写不产生预期的输出

繁花不似锦 2021-10-13 16:03:13
我有一组来自映射器的减速器输入:(1939, [121, 79, 83, 28]) (1980, [0, 211, −113])我想得到如下输出:1939 max:121 min:28 avg: 77.75如果我在我的 reducer 类中不使用如下自定义可写,我可以得到它:public static class MaxTemperatureReducer      extends Reducer<Text, IntWritable, Text, Text> {          Text yearlyValue = new Text();      @Override      public void reduce(Text key, Iterable<IntWritable> values,          Context context)          throws IOException, InterruptedException {            int sum = 0;            int CounterForAvg = 0;            int minValue = Integer.MAX_VALUE;            int maxValue = Integer.MIN_VALUE;            float avg;            for (IntWritable val : values) {                int currentValue = val.get();                sum += currentValue;                CounterForAvg++;                minValue = Math.min(minValue, currentValue);                maxValue = Math.max(maxValue, currentValue);            }            avg = sum / CounterForAvg;            String requiredValue = "max temp:"+maxValue + "\t" +"avg temp: "+ avg + "\t"+ "min temp: " +minValue;            yearlyValue.set(requiredValue);            context.write(key, yearlyValue);      }    }但是,使用 customwritable 类会产生以下结果:1939 1211939 791939 831939 281980 01980 2111980 -113这是我如何实现自定义类和减速器。我将可迭代对象发送到自定义类并在那里执行计算。我无法弄清楚我在这里做错了什么。我在 Java 中有 0 exp。
查看完整描述

1 回答

?
慕容3067478

TA贡献1773条经验 获得超3个赞

合并的调用不应该帮助我连接值吗


当然可以,但你没有正确使用它。out从未初始化。


  CompositeWritable out; // null here

  Text textYearlyValue = new Text();


  public void reduce(Text key, Iterable<IntWritable> values,

      Context context)

      throws IOException, InterruptedException {

         out.merge(values); // still null, should throw an exception

如果你想输出一行字符串,你可以只使用一个Text对象。你的merge(Iterable<IntWritable> values)方法可以去任何地方,它不必在一个完全独立的类中来返回一个 Writable 对象。


但无论如何,如果练习是为了学习如何实现自定义可写,那么就可以了。

注意事项:

  1. 如果你想“组合”多个字段,那么你应该声明它们

  2. readFields并且write需要按照相同的顺序

  3. toString确定您在使用TextOutputFormat(默认)时在减速器输出中看到的内容

  4. equals并且hashCode为了完整性而添加(理想情况下你会实现WritableComparable,但这真的只对键重要,而不是那么多值)

  5. 为了与其他 Writables 类似,我将您的merge方法重命名为set.

你可以期待下面的输出看起来像

1939    MinMaxAvgWritable{min=28, max=121, avg=77.75}

1980    MinMaxAvgWritable{min=-113, max=211, avg=32.67}

public class MinMaxAvgWritable implements Writable {


    private int min, max;

    private double avg;


    private DecimalFormat df = new DecimalFormat("#.00");


    @Override

    public String toString() {

        return "MinMaxAvgWritable{" +

                "min=" + min +

                ", max=" + max +

                ", avg=" + df.format(avg) +

                '}';

    }


    @Override

    public boolean equals(Object o) {

        if (this == o) return true;

        if (o == null || getClass() != o.getClass()) return false;

        MinMaxAvgWritable that = (MinMaxAvgWritable) o;

        return min == that.min &&

                max == that.max &&

                avg == that.avg;

    }


    @Override

    public int hashCode() {

        return Objects.hash(min, max, avg);

    }


    @Override

    public void write(DataOutput dataOutput) throws IOException {

        dataOutput.writeInt(min);

        dataOutput.writeInt(max);

        dataOutput.writeDouble(avg);

    }


    @Override

    public void readFields(DataInput dataInput) throws IOException {

        this.min = dataInput.readInt();

        this.max = dataInput.readInt();

        this.avg = dataInput.readDouble();

    }


    public void set(int min, int max, double avg) {

        this.min = min;

        this.max = max;

        this.avg = avg;

    }


    public void set(Iterable<IntWritable> values) {

        this.min = Integer.MAX_VALUE;

        this.max = Integer.MIN_VALUE;


        int sum = 0;

        int count = 0;

        for (IntWritable iw : values) {

            int i = iw.get();

            if (i < this.min) this.min = i;

            if (i > max) this.max = i;

            sum += i;

            count++;

        }


        this.avg = count < 1 ? sum : (sum / (1.0*count));

    }

}

有了这个,reducer就很简单了


public class CompositeReducer extends Reducer<Text, IntWritable, Text, MinMaxAvgWritable> {


    private final MinMaxAvgWritable output = new MinMaxAvgWritable();


    @Override

    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        // This 'set/merge' method could just as easily be defined here, and return a String to be set on a Text object

        output.set(values);  

        context.write(key, output);

    }

}

工作是这样设置的


    // outputs for mapper and reducer

    job.setOutputKeyClass(Text.class);


    // setup mapper

    job.setMapperClass(TokenizerMapper.class);  // Replace with your mapper

    job.setMapOutputValueClass(IntWritable.class);


    // setup reducer

    job.setReducerClass(CompositeReducer.class);

    job.setOutputValueClass(MinMaxAvgWritable.class); // notice custom writable


    FileInputFormat.addInputPath(job, new Path(args[0]));

    FileOutputFormat.setOutputPath(job, new Path(args[1]));


    return job.waitForCompletion(true) ? 0 : 1;


查看完整回答
反对 回复 2021-10-13
  • 1 回答
  • 0 关注
  • 139 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信