package com.zkpk.us;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;class UserMapper extends Mapper<LongWritable, Text, Text, IntWritable> { protected void map( LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws java.io.IOException, InterruptedException { IntWritable one = new IntWritable(1); String[] columns = value.toString().split("\t"); if (columns != null && columns.length == 6) { Text uid = new Text(columns[1]); context.write(uid, one); } };}class UserReducer extends Reducer<Text, IntWritable, Text, IntWritable> { protected void reduce( Text key, java.lang.Iterable<IntWritable> values, org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws java.io.IOException, InterruptedException { int sum = 0; for(IntWritable value : values){ sum += value.get(); } context.write(key, new IntWritable(sum)); };}public class UserCount { /** * @param args */ public static void main(String[] args)throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "UserUid"); job.setJarByClass(UserCount.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(UserMapper.class); job.setReducerClass(UserReducer.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.waitForCompletion(true); }}
3 回答
赵强老师
TA贡献10条经验 获得超14个赞
这是MapReduce程序,是Hadoop处理数据的核心。程序有三部分组成:
第一部分:Map。表示将数据进行分词处理,分隔符是制表键
第二部分:Reduce。将Map的输出进行汇总,得到最后的输出。
第三部分:主程序。将Map和Reduce组成一个任务job,来执行,数据的输入和输出都来至于HDFS。
有问题,可以再问我。呵呵
- 3 回答
- 0 关注
- 1393 浏览
添加回答
举报
0/150
提交
取消