MapReduce简单案例 您所在的位置:网站首页 表格信息去重 MapReduce简单案例

MapReduce简单案例

2024-07-07 14:38| 来源: 网络整理| 查看: 265

MapReduce简单案例

目录MapReduce简单案例案例一 文件合并和去重操作案例二 实现对输入文件的排序案例三 对给定的表格进行信息挖掘

案例一 文件合并和去重操作

对于两个输入文件,即文件A和文件B,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C。下面是输入文件和输出文件的一个样例供参考。

输入文件A的样例如下:

数据 20150101 x 20150103 x 20150104 y 20150102 y 20150105 z 20150106 x

输入文件B的样例如下:

数据 20150101 y 20150102 y 20150103 x 20150104 z 20150105 y

根据输入文件A和B合并得到的输出文件C的样例如下:

数据 20150101 x 20150101 y 20150102 y 20150103 x 20150104 y 20150104 z 20150105 y 20150105 z 20150106 x

代码:

import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class hebing { public static class Mymapper extends Mapper { public void map(Object key, Text value, Context content) throws IOException, InterruptedException { content.write(value, new Text("")); } } public static class Myreducer extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, new Text("")); } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"hebing"); job.setJarByClass(hebing.class); job.setMapperClass(hebing.Mymapper.class); job.setCombinerClass(hebing.Myreducer.class); job.setReducerClass(hebing.Myreducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/input")); FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output")); System.exit(job.waitForCompletion(true) ? 0 : 1); } } 案例二 实现对输入文件的排序

现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数。下面是输入文件和输出文件的一个样例供参考。

输入文件1的样例如下:

数据 33 37 12 40

输入文件2的样例如下:

数据 4 16 39 5

输入文件3的样例如下:

数据 1 45 25

根据输入文件1、2和3得到的输出文件如下:

序号 数据 1 1 2 4 3 5 4 12 5 16 6 25 7 33 8 37 9 39 10 40 11 45

代码:

import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class Sort { public static class Mymapper extends Mapper{ private static IntWritable v = new IntWritable(); public void map(Object key, Text value, Context context) throws IOException,InterruptedException{ v.set(Integer.parseInt(value.toString())); context.write(v, new IntWritable(1)); } } public static class Myreducer extends Reducer{ private static IntWritable line_num = new IntWritable(1); public void reduce(IntWritable key, Iterable values, Context context) throws IOException,InterruptedException{ for(IntWritable num : values) { context.write(line_num, key); line_num = new IntWritable(line_num.get() + 1); } } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"Sort"); job.setJarByClass(Sort.class); job.setMapperClass(Sort.Mymapper.class); job.setReducerClass(Sort.Myreducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/input")); FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output")); System.exit(job.waitForCompletion(true) ? 0 : 1); } } 案例三 对给定的表格进行信息挖掘

下面给出一个child-parent的表格,要求挖掘其中的父子辈关系,给出祖孙辈关系的表格。 输入文件内容如下:

child parent Steven Lucy Steven Jack Jone Lucy Jone Jack Lucy Mary Lucy Frank Jack Alice Jack Jesse David Alice David Jesse Philip David Philip Alma Mark David Mark Alma

​ 输出文件内容如下:

grandchild grandparent Steven Alice Steven Jesse Jone Alice Jone Jesse Steven Mary Steven Frank Jone Mary Jone Frank Philip Alice Philip Jesse Mark Alice Mark Jesse

代码:

import java.io.IOException; import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class Child2Parent { public static class Mymapper extends Mapper{ public void map(Object key, Text value, Context context) throws IOException,InterruptedException{ String[] cap=value.toString().split("[\\s|\\t]+");//分割数据 if (!"child".equals(cap[0])) { String cName = cap[0]; String pName = cap[1]; context.write(new Text(pName), new Text("r#"+cName));//打标签 context.write(new Text(cName), new Text("l#"+pName)); } } } public static class Myreduce extends Reducer{ public static int runtime = 0; public void reduce(Text key, Iterable values,Context context) throws IOException,InterruptedException{ if (runtime == 0) { context.write(new Text("grandchild"), new Text("grandparent")); runtime++; } List grandChild = new ArrayList(); List grandParent = new ArrayList(); for (Text text : values) { String[] relation = text.toString().split("#"); if ("l".equals(relation[0])) { grandChild.add(relation[1]); } else { grandParent.add(relation[1]); } } for (String l:grandChild) { for (String r:grandParent) { context.write(new Text(r), new Text(l)); } } } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"TableJoin"); job.setJarByClass(Child2Parent.class); job.setMapperClass(Child2Parent.Mymapper.class); job.setReducerClass(Child2Parent.Myreduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/input")); FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output")); System.exit(job.waitForCompletion(true) ? 0 : 1); } }


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有