wc词频统计代码: 由三个阶段组成: 1)Mapper阶段 (1)用户自定义的Mapper要继承自己的父类 Mapper (2)Mapper的输入数据是KV对的形式(KV的类型可自定义) (3)Mapper中的业务逻辑写在map()方法中 (4)Mapper的输出数据是KV对的形式(KV的类型可自定义) (5)map()方法(maptask进程)对每一个<K,V>调用一次 2)Reducer阶段 (1)用户自定义的Reducer要继承自己的父类 (2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV (3)Reducer的业务逻辑写在reduce()方法中 (4)Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法 3)Driver阶段 整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象 mapTest -->Mapper<偏移量,每行的内容,key输出,value输出>
package jh.com.mapReduce; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class map extends Mapper<LongWritable,Text,Text,IntWritable>{ Text t = new Text(); IntWritable intWritable=new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //读取每行内容,根据指定分隔符 分割单词 ,传给下一个阶段 reduece task String s = value.toString(); String[] split = s.split(" "); for (String s1 : split) { t.set(s1); context.write(t,intWritable); } } }map task全部执行结束后,会进入reduce阶段: 注意:一次进入reduce的数据中 key值是一样的 reduce --> Reducer<map key输出的类型,map value输出的类型,reduce key输出,value输出> 重写了一个方法 reduce (key单词,iter相同key的所有value值,context)
package jh.com.mapReduce; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class reduce extends Reducer<Text,IntWritable,Text,IntWritable>{ IntWritable intWritable = new IntWritable(); //三个参数 map端输出key值 2.相同key的 @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //遍历迭代器,汇总计算 int sum=0; for (IntWritable value : values) { // 获取intwritable中的值 int i = value.get(); sum += i; } intWritable.set(sum); context.write(key,intWritable); } }driver类用于提交任务
package jh.com.mapReduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.File; import java.io.IOException; public class driver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //driver 用于提交任务,暂时通过job来提交任务 Configuration configuration = new Configuration(); //参数说明 1.配置文件 2.文件名称 Job job = Job.getInstance(configuration, driver.class.getSimpleName()); //指定driver类 job.setJarByClass(driver.class); //指定map类 job.setMapperClass(map.class); //指定map的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定reduce类 job.setReducerClass(reduce.class); //指定reduce的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定文件的输入路径 FileInputFormat.setInputPaths(job,new Path("C:\\Users\\dell\\Desktop\\学习\\System.txt")); //指定文件的输出路径 不能提前存在 FileOutputFormat.setOutputPath(job,new Path("F:\\output001")); //调教任务 boolean b = job.waitForCompletion(true); if (b){ System.out.println("任务执行成功"); }else { System.out.println("任务执行失败"); } } }