大数据计算引擎MapReduce框架详解 您所在的位置:网站首页 内存mb等于多少m 大数据计算引擎MapReduce框架详解

大数据计算引擎MapReduce框架详解

2023-03-10 02:19| 来源: 网络整理| 查看: 265

今天来介绍下大数据计算引擎MapReduce,MapReduce主要用于离线计算,电商公司的离线计算任务大多数是用Hive将sql转化为MR程序来运行,可见MapReduce的重要性。

MapReduce介绍

MapReduce是一个分布式运算程序的编程框架。

MapReduce优缺点

优点:易于编程、有良好的扩展性、具有高容错性、适合PB级以上海量数据的离线处理。

缺点:不擅长实时计算、不擅长流式计算、不擅长DAG计算(DAG有向图MR也可以做,只是每个作业的输出结果都会写入磁盘,这样会造成大量的IO而导致性能降低)。

MapReduce核心编程思想

分布式计算程序需要分成至少2个阶段。第一个阶段的MapTask并发实例完全并行运行互不相干。第二个阶段的ReduceTask并发实例也互不相干,但是数据来源依赖上一阶段的所有MapTask并发实例的输出。MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果逻辑复杂可以多个MapReduce串行运行。 MapReduce进程

一个完整的MapReduce程序运行时有三种实例进程:

MrAppMaster:负责整个程序的过程调度及状态协调。MapTask:负责Map阶段的整个数据处理流程。ReduceTask:负责Reduce阶段的整个数据处理流程。 WordCount案例

根据官方文档,WordCount程序应编写类分别继承Mapper、Reducer并重写相应的方法,最后写驱动类来调用和提交。且数据的类型是Hadoop自身封装的序列化类型。

Java类型Hadoop Writable类型BooleanBooleanWritableByteByteWritableIntegerIntWritableFloatFloatWritableLongLongWritableDoubleDoubleWritableStringTextMapMapWritableArrayArrayWritable

需求:统计文本文件中每个单词出现的总次数。

创建maven工程并在pom.xml中添加依赖。编写继承Mapper的类。 public class WordcountMapper extends Mapper{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //读取一行数据 String line = value.toString(); //按空格切分 String[] words = line.split(" "); //封装并输出 for (String word : words) { k.set(word); context.write(k, v); } } } 编写继承Reducer的类。 public class WordcountReducer extends Reducer{ int sum; IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException { //累加求和 sum = 0; for (IntWritable count : values) { sum += count.get(); } //输出 v.set(sum); context.write(key,v); } } 编写Driver驱动类。 public class WordcountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //获取配置信息 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); //设置jar加载路径 job.setJarByClass(WordcountDriver.class); //设置map和reduce类 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); //设置map输出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置最终输出kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } } 将程序打成jar包并上传到集群。启动集群并执行程序hadoop jar wc.jar com.robofly.wordcount.WordcountDriver /dgf/input /output。 Hadoop序列化

序列化和反序列化:

序列化就是把内存中的对象转换成字节序列(或其他数据传输协议),便于存储到磁盘或网络传输。反序列化就是将收到的字节序列或者是磁盘的持久化数据,转换成内存中的对象。

为什么要序列化?

一般来说对象只存在内存中,断电就会消失,且对象只能由本地的进程使用不能发送到网络上的其他节点。然而序列化可以存储对象,可以将对象发送到其他节点。

为什么不用Java的序列化?

Java的序列化是重量级序列化框架,对象被序列化后会附带很多额外的信息,不便于在网络中高效传输,所以Hadoop自己开发了一套序列化机制Writable。

开发中当常用的基本序列化类型不能满足需求时就需要自己实现序列化接口,比如在Hadoop内部传递bean对象时。

自定义bean对象实现序列化接口:

必须实现Writable接口。反序列化时,需要反射调用空参构造函数,故必须有空参构造函数。重写序列化方法和反序列化方法,且顺序要一致。要让结果显示在文件中时需要重写toString()。如果需要将bean放在key中传输,还需要实现Comparable接口,因为MapReduce中的Shuffle过程要求对key必须能排序。 MapReduce框架原理

从上面可以知道MapReduce分为Map和Reduce两个阶段,下面就从Input到Output来解析MapReduce。

InputFormat数据输入 1、切片与MapTask并行度

之前说过块的大小是128M,数据块是HDFS在物理上把数据分成一块一块。而数据切片只是逻辑上对输入进行切片,并不会在磁盘上将其切分进行存储,且切片是对每一个文件单独切片。

每个Split切片都是分配一个MapTask并行实例进行处理,那么MapTask并行度就等于切片数。在默认情况下切片大小等于块的大小,是为了尽量避免跨节点传输数据。

2、Job提交流程源码和切片源码解析

Job提交流程主要有三步:

生成切片信息,并写到stag路径。生成配置信息,并写到stag路径。上传jar包,向集群提交job会向stag路径上传。

Job提交源码解析:

//1.1是1方法里面的方法,以此类推 //1.提交Job job.waitForCompletion(); //1.1.开始提交 submit(); //1.1.1.设置使用新的API setUseNewAPI(); //1.1.2.建立连接,会创建一个根据不同场景(本地或集群)下提交Job的对象 connect(); //1.1.2.1.创建提交Job的代理 new Cluster(getConfiguration()); //1.1.2.1.1.判断是本地还是集群(是本地则创建LocalJobRunner对象,是集群则创建YarnRunner对象) initialize(jobTrackAddr, conf); //1.1.3.提交job submitter.submitJobInternal(Job.this, cluster); //1.1.3.1.创建给集群提交数据的Stag路径(是本地就在项目的盘符中创建,是集群就在HDFS上创建) Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); //1.1.3.2.获取jobid,并创建Job路径,路径是根据jobStagingArea和jobId拼接起来的 JobID jobId = submitClient.getNewJobID(); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); //1.1.3.3.拷贝jar包到集群(向集群提交就会向HDFS上传jar包) copyAndConfigureFiles(job, submitJobDir); //1.1.3.3.1.上传jar包 rUploader.uploadFiles(job, jobSubmitDir); //1.1.4.计算切片,生成切片规划文件写到submitJobDir路径中 writeSplits(job, submitJobDir); maps = writeNewSplits(job, jobSubmitDir); input.getSplits(job); //1.1.5.向submitJobFile路径写XML配置文件 writeConf(conf, submitJobFile); conf.writeXml(out); //1.1.6.提交Job,返回提交状态(是本地submitClient就是LocalJobRunner,是集群submitClient就是YarnRunner) status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

FileInputFormat切片源码解析:

//FileInputFormat中的getSplits方法是用来生成切片信息的,看这一个方法即可 //获取文件路径 Path path = file.getPath(); //获取文件大小 long length = file.getLen(); //判断文件是否可切 if (isSplitable(job, path)) { //获取文件的块大小 long blockSize = file.getBlockSize(); //获取切片大小 /*计算切片大小,默认:切片大小=块大小 切片大小>块大小:修改minSize的值;切片大小 1.1 好处:让最后一片不会太小,不会浪费MapTask资源 缺点:会造成跨节点读数据(只会对最后一个MapTask造成跨节点读数据) */ //计算剩余文件是否可以切片 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //块的索引 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); //生成切片信息 /*splits:是集合用来装切片信息 makeSplit:该方法用来生成切片信息 参数1-path:文件路径,参数2-length-bytesRemaining:片的起始位置,参数3-splitSize:切片大小(偏移量) */ splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); //重新计算剩余文件大小 bytesRemaining -= splitSize; } //将剩余的文件切成一片 if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } 3、FileInputFormat切片机制

先来看下InputFormat的继承树: |----InputFormat(抽象类) |----|----FileInputFormat(抽象类) |----|----|----TextInputFormat(默认用来读取数据的类) |----|----|----CombineFileInputFormat(抽象类) |----|----|----|----CombineTextInputFormat(可以合并小文件)

InputFormat中的抽象方法:

//获取切片信息 public abstract List getSplits(JobContext context) throws IOException, InterruptedException; //获取RecordReader对象,该对象是真正用来读取数据的对象 public abstract RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;

在FileInputFormat抽象类中重写了InputFormat中的getSplits方法,该方法用来生成切片信息。

TextInputFormat是FileInputFormat的实现类,重写了InputFormat中的createRecordReader方法,该方法返回了LineRecordReader,是RecordReader的子类,由名字可以看出是一行一行的读取数据的。TextInputFormat的键是存储该行在整个文件中的起始字节偏移量(LongWritable类型),值是这行的内容(Text类型)。

@Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { return new LineRecordReader(recordDelimiterBytes); }

FileInputFormat切片机制:

简单的按照文件的内容长度进行切片。切片大小默认等于块大小。切片时不考虑数据集整体,而是对每个文件单独切片。 4、CombineTextInputFormat切片机制

CombineTextInputFormat可以将多个小文件合并成一片进行处理。要想使用CombineTextInputFormat需要设置虚拟存储切片最大值(setMaxInputSplitSize)。

假设虚拟存储切片最大值为4M,现在有四个文件大小分别为1.5M、5.1M、3.8M、6.8M,则虚拟存储过程对四个文件划分的块大小如下: 1.5M //key.hashCode() & Integer.MAX_VALUE:用来保证结果一定为正数 return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }

自定义Partitioner步骤:

自定义类继承Partitioner重写getPartition()方法。在job的驱动类中设置自定义Partitioner。自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask。

Partition分区总结:

如果ReduceTask的数量>getPartition的结果数,则会多产生几个空的输出文件part-r-000xx。如果1


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有