MapReduce统计单词数目详细说明 您所在的位置:网站首页 java统计数据出现次数思路 MapReduce统计单词数目详细说明

MapReduce统计单词数目详细说明

2024-05-26 23:18| 来源: 网络整理| 查看: 265

文章目录 一、准备数据二、MR的编程规范三、统计本地文件的单词数代码四、统计分布式文件系统的单词数

一、准备数据 注意:准备的数据的格式必须是文本 编码必须是utf-8无bom!

在这里插入图片描述

二、MR的编程规范

基础知识请参考我这篇博客:MapReduce核心详解

MR的编程只需要将自定义的组件和系统默认组件进行组合,组合之后运行即可!

编程步骤: ①Map阶段的核心处理逻辑需要编写在Mapper中 ②Reduce阶段的核心处理逻辑需要编写在Reducer中 ③将编写的Mapper和Reducer进行组合,组合成一个Job ④对Job进行设置,设置后运行

三、统计本地文件的单词数代码

Mapper阶段

package com.ygp.hadoop.mapreduce; import java.io.IOException; import java.util.List; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.RecordReader; /* * 注意:导包时,导入 org.apache.hadoop.mapreduce包下的类(2.0的新api) * * 1. 自定义的类必须复合MR的Mapper的规范 * * 2.在MR中,只能处理key-value格式的数据 * KEYIN, VALUEIN: mapper输入的k-v类型。 由当前Job的InputFormat的RecordReader决定! * 封装输入的key-value由RR自动进行。 * * KEYOUT, VALUEOUT: mapper输出的k-v类型: 自定义 * * 3. InputFormat的作用: * ①验证输入目录中文件格式,是否符合当前Job的要求 * ②生成切片,每个切片都会交给一个MapTask处理 * ③提供RecordReader,由RR从切片中读取记录,交给Mapper进行处理 * * 方法: List getSplits: 切片 * RecordReader createRecordReader: 创建RR * * 默认hadoop使用的是TextInputFormat,TextInputFormat使用LineRecordReader! * * 4. 在Hadoop中,如果有Reduce阶段。通常key-value都需要实现序列化协议! * * MapTask处理后的key-value,只是一个阶段性的结果! * 这些key-value需要传输到ReduceTask所在的机器! * 将一个对象通过序列化技术,序列化到一个文件中,经过网络传输到另外一台机器, * 再使用反序列化技术,从文件中读取数据,还原为对象是最快捷的方式! * * java的序列化协议: Serilizxxxxx * 特点:不仅保存对象的属性值,类型,还会保存大量的包的结构,子父类和接口的继承信息! * 重 * hadoop开发了一款轻量级的序列化协议: Wriable机制! * * */ public class WCMapper extends Mapper{ private Text out_key=new Text(); private IntWritable out_value=new IntWritable(1); // 针对输入的每个 keyin-valuein调用一次 (0,hello hi hello hi) @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { System.out.println("keyin:"+key+"----keyout:"+value); String[] words = value.toString().split(","); for (String word : words) { out_key.set(word); //写出数据(单词,1) context.write(out_key, out_value); } } }

Reducer阶段

package com.ygp.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /* * 1. Reducer需要复合Hadoop的Reducer规范 * * 2. KEYIN, VALUEIN: Mapper输出的keyout-valueout * KEYOUT, VALUEOUT: 自定义 * */ public class WCReducer extends Reducer{ private IntWritable out_value=new IntWritable(); // reduce一次处理一组数据,key相同的视为一组 @Override protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException { int sum=0; for (IntWritable intWritable : values) { sum+=intWritable.get(); } out_value.set(sum); //将累加的值写出 context.write(key, out_value); } }

Driver部分

package com.ygp.hadoop.mapreduce; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /* * 1.一旦启动这个线程,运行Job * * 2.本地模式主要用于测试程序是否正确! * * 3. 报错: * ExitCodeException exitCode=1: /bin/bash: line 0: fg: no job control */ public class WCDriver { public static void main(String[] args) throws Exception { Path inputPath=new Path("C:\\Users\\Lenovo\\Desktop\\05-Mapreduce\\mrinput\\wordcount"); Path outputPath=new Path("C:\\Users\\Lenovo\\Desktop\\05-Mapreduce\\mroutput\\wordcount"); //作为整个Job的配置 Configuration conf = new Configuration(); //保证输出目录不存在 FileSystem fs=FileSystem.get(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } // ①创建Job Job job = Job.getInstance(conf); // 为Job创建一个名字 job.setJobName("wordcount"); // ②设置Job // 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型 job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); // Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化 // 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置输入目录和输出目录 FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // ③运行Job job.waitForCompletion(true); } }

然后看看执行driver后,在输出目录下的输出结果: 在这里插入图片描述 在这里插入图片描述

四、统计分布式文件系统的单词数

在我搭的分布式集群中统计文件的单词数目,首先你得把hdfs给打开,如下图: 在这里插入图片描述 然后再把yarn给启动了: 在这里插入图片描述 然后我们将要统计的文件先上传到分布式文件系统中: 在这里插入图片描述

然后我们执行如下程序(Mapper与Reducer不变,只是在driver里告诉程序yarn的位置):

public class WCDriver { public static void main(String[] args) throws Exception { //Path inputPath=new Path("C:\\Users\\Lenovo\\Desktop\\05-Mapreduce\\mrinput\\wordcount"); //Path outputPath=new Path("C:\\Users\\Lenovo\\Desktop\\05-Mapreduce\\mroutput\\wordcount"); Path inputPath=new Path("/wordcount.txt"); Path outputPath=new Path("/mroutput/wordcount"); //作为整个Job的配置 Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop1:9000"); // 在YARN上运行 conf.set("mapreduce.framework.name", "yarn"); // RM所在的机器 conf.set("yarn.resourcemanager.hostname", "hadoop2"); //保证输出目录不存在 FileSystem fs=FileSystem.get(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } // ①创建Job Job job = Job.getInstance(conf); // 为Job创建一个名字 job.setJobName("wordcount"); // ②设置Job // 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型 job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); // Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化 // 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置输入目录和输出目录 FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // ③运行Job job.waitForCompletion(true); } }

然后报错了,他说我输入目录不存在,但是刚刚明明看到有这个文件存在。 在这里插入图片描述 这时,我们需要将代码变动:

public static void main(String[] args) throws Exception { //Path inputPath=new Path("C:\\Users\\Lenovo\\Desktop\\05-Mapreduce\\mrinput\\wordcount"); //Path outputPath=new Path("C:\\Users\\Lenovo\\Desktop\\05-Mapreduce\\mroutput\\wordcount"); Path inputPath=new Path("/wordcount.txt"); Path outputPath=new Path("/mroutput/wordcount"); //作为整个Job的配置 Configuration conf = new Configuration(); //conf.set("fs.defaultFS", "hdfs://hadoop1:9000"); // 在YARN上运行 conf.set("mapreduce.framework.name", "yarn"); // RM所在的机器 conf.set("yarn.resourcemanager.hostname", "hadoop2"); //保证输出目录不存在 FileSystem fs=FileSystem.get(new URI("hdfs://hadoop1:9000"), conf, "ygp"); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } // ①创建Job Job job = Job.getInstance(conf); // 为Job创建一个名字 job.setJobName("wordcount"); // ②设置Job // 告诉NM运行时,MR中Job所在的Jar包在哪里 //job.setJar("MapReduce-0.0.1-SNAPSHOT.jar"); // 将某个类所在地jar包作为job的jar包 job.setJarByClass(WCDriver.class); // 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型 job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); // Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化 // 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置输入目录和输出目录 FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // ③运行Job job.waitForCompletion(true); } }

要告诉分布式文件系统所要找的类的类名是什么。当然在此之前,我们要用maven将我们写的代码打成jar包,对项目执行maven build。 在这里插入图片描述

在这里插入图片描述 然后把该jar包上传到hadoop2中。 在这里插入图片描述 然后执行hadoop jar 打包的包名 主类的全类名 在这里插入图片描述 然后查看一下输出目录,发现已经成功了。 在这里插入图片描述 然后将上图所指文件下载,打开验证一下,与本地统计的情况一致。 在这里插入图片描述



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有