Hadoop大数据案例之搜狗五百万数据分析 您所在的位置:网站首页 csv转utf8编码数据条数变少 Hadoop大数据案例之搜狗五百万数据分析

Hadoop大数据案例之搜狗五百万数据分析

2024-05-30 17:02| 来源: 网络整理| 查看: 265

最近使用了hadoop中的hive、mapreduce以及HBASE对网上的一个搜狗五百万的数进行了一个比较实际的数据分析,适合新手去练习,好处是在接触较大的数据流的时候能碰到平时接触不到的问题,通过这些问题能够对自己有一个较好的提升,为以后接触到实际的大数据项目打一些有效的基础。

数据源:https://download.csdn.net/download/leoe_/10429414 数据说明:搜狗五百万数据,是经过处理后的搜狗搜索引擎生产数据,具有真实性,大数据性,能够较好的满足分布式计算应用开发课程设计的数据要求。 搜狗数据的格式: 访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL。 其中,用户ID是根据用户使浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应一个用户ID。

操作要求:

1.将原始数据加载到HDFS平台。 2.将原始数据中的时间字段拆分并拼接,添加年、月、日、小时字段。 3.将处理后的数据加载到HDFS平台。 4.以下操作分别通过MR和Hive实现。

查询总条数 非空查询条数无重复总条数独立UID总数 查询频度排名(频度最高的前50词)查询次数大于2次的用户总数查询次数大于2次的用户占比 Rank在10以内的点击次数占比直接输入URL查询的比例查询搜索过”仙剑奇侠传“的uid,并且次数大于3

5.将4每步骤生成的结果保存到HDFS中。 6.将5生成的文件通过Java API方式导入到HBase(一张表)。 7.通过HBase shell命令查询6导出的结果。

源数据处理

1.将原始数据加载到HDFS平台。

hadoop fs -put sogou.500w.utf8 /sogou/data

2.将原始数据中的时间字段拆分并拼接,添加年、月、日、小时字段。

bash sogou-log-extend.sh sogou.500w.utf8 sogou.500w.utf8.ext

其中sogou-log-extend.sh文件的内#!/bin/bash

#infile=/root/file/sogou.500w.utf8 infile=$1 #outfile=/root/filesogou.500w.utf8.final outfile=$2 awk -F '\t' '{print $0"\t"substr($1,0,5)"year\t"substr($1,5,2)"month\t"substr($1,7,2)"day\t"substr($1,8,2)"hour"}' $infile > $outfile

3.处理后的数据加载到HDFS平台。

hadoop fs -put sogou.500w.utf8.ext /sogou/data Hive分析数据

首先要先将数据导入到hive仓库中:

查看数据库:show databases创建数据库: create database sogou使用数据库: use sogou创建sougou表:Create table sougou(time string,uuid string,name string,num1 int,num2 int,url string) Row format delimited fields terminated by '\t';加载数据到表里:Load data local inpath 'sogou.500w.utf8' into table sougou;查看表信息:desc sogou; 这里写图片描述 查询总条数 HQL:select count(*) from sougou非空查询条数 HQL:select count(*) from sougou where name is not null and name !='';无重复总条数 HQL:select count(*) from (select * from sougou group by time,num1,num2,uuid,name,url having count(*)=1) a;独立UID总数 HQL:select count(distinct uuid) from sougou;查询频度排名(频度最高的前50词) HQL:select name,count(*) as pd from sougou group by name order by pd desc limit 50; 查询次数大于2次的用户总数 HQL:select count(a.uuid) from (select uuid,count(*) as cnt from sougou group by uuid having cnt > 2) a;Rank在10以内的点击次数占比 HQL:select count(*) from sougou where num13; MapReduce分析数据

(1)查询总条数

package com.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class MRCountAll { public static Integer i = 0; public static boolean flag = true; public static class CountAllMap extends Mapper { @Override protected void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException { i++; } } public static void runcount(String Inputpath, String Outpath) { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000"); Job job = null; try { job = Job.getInstance(conf, "count"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } job.setJarByClass(MRCountAll.class); job.setMapperClass(CountAllMap.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); try { FileInputFormat.addInputPath(job, new Path(Inputpath)); } catch (IllegalArgumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } FileOutputFormat.setOutputPath(job, new Path(Outpath)); try { job.waitForCompletion(true); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static void main(String[] args) throws Exception { runcount("/sogou/data/sogou.500w.utf8", "/sogou/data/CountAll"); System.out.println("总条数: " + i); } }

(2)非空查询条数

package com.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class CountNotNull { public static String Str = ""; public static int i = 0; public static boolean flag = true; public static class wyMap extends Mapper { @Override protected void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException { String[] values = value.toString().split("\t"); if (!values[2].equals(null) && values[2] != "") { context.write(new Text(values[1]), new IntWritable(1)); i++; } } } public static void run(String inputPath, String outputPath) { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000"); Job job = null; try { job = Job.getInstance(conf, "countnotnull"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } assert job != null; job.setJarByClass(CountNotNull.class); job.setMapperClass(wyMap.class); //job.setReducerClass(wyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); try { FileInputFormat.addInputPath(job, new Path(inputPath)); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } try { FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.waitForCompletion(true); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { run("/sogou/data/sogou.500w.utf8", "/sogou/data/CountNotNull"); System.out.println("非空条数: " + i); } }

(3)无重复总条数

package com.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.File; import java.io.IOException; public class CountNotRepeat { public static int i = 0; public static class NotRepeatMap extends Mapper{ @Override protected void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException { String text = value.toString(); String[] values = text.split("\t"); String time = values[0]; String uid = values[1]; String name = values[2]; String url = values[5]; context.write(new Text(time+uid+name+url), new Text("1")); } } public static class NotRepeatReduc extends Reducer{ @Override protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException { i++; context.write(new Text(key.toString()),new IntWritable(i)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000"); Job job = null; try { job = Job.getInstance(conf, "countnotnull"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } assert job != null; job.setJarByClass(CountNotRepeat.class); job.setMapperClass(NotRepeatMap.class); job.setReducerClass(NotRepeatReduc.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); try { FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8")); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } try { FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountNotRepeat")); job.waitForCompletion(true); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("无重复总条数为: " + i); } }

(4)独立UID总数

package com.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class CountNotMoreUid { public static int i = 0; public static class UidMap extends Mapper{ @Override protected void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException { String text = value.toString(); String[] values = text.split("\t"); String uid = values[1]; context.write(new Text(uid), new Text("1")); } } public static class UidReduc extends Reducer{ @Override protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException { i++; context.write(new Text(key.toString()),new IntWritable(i)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000"); Job job = null; try { job = Job.getInstance(conf, "countnotnull"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } assert job != null; job.setJarByClass(CountNotNull.class); job.setMapperClass(UidMap.class); job.setReducerClass(UidReduc.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); try { FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8")); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } try { FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountNotMoreUid")); job.waitForCompletion(true); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("独立UID条数: " + i); } }

(5)查询频度排名(频度最高的前50词)

package com.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.TreeMap; public class CountTop50 { public static class TopMapper extends Mapper{ Text text =new Text(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] line= value.toString().split("\t"); String keys = line[2]; text.set(keys); context.write(text,new LongWritable(1)); } } public static class TopReducer extends Reducer< Text,LongWritable, Text, LongWritable>{ Text text = new Text(); TreeMap map = new TreeMap(); @Override protected void reduce(Text key, Iterable value, Context context) throws IOException, InterruptedException { int sum=0;//key出现次数 for (LongWritable ltext : value) { sum+=ltext.get(); } map.put(sum,key.toString()); //去前50条数据 if(map.size()>50){ map.remove(map.firstKey()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for(Integer count:map.keySet()){ context.write(new Text(map.get(count)), new LongWritable(count)); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000"); Job job = Job.getInstance(conf, "count"); job.setJarByClass(CountTop50.class); job.setJobName("Five"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setMapperClass(TopMapper.class); job.setReducerClass(TopReducer.class); FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8")); FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountTop50")); job.waitForCompletion(true); } }

(6)查询次数大于2次的用户总数

package com.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class CountQueriesGreater2 { public static int total = 0; public static class MyMaper extends Mapper { protected void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException { String[] str = value.toString().split("\t"); Text word; IntWritable one = new IntWritable(1); word = new Text(str[1]); context.write(word, one); } } public static class MyReducer extends Reducer { @Override protected void reduce(Text arg0, Iterable arg1, Reducer.Context arg2) throws IOException, InterruptedException { // arg0是一个单词 arg1是对应的次数 int sum = 0; for (IntWritable i : arg1) { sum += i.get(); } if(sum>2){ total=total+1; } //arg2.write(arg0, new IntWritable(sum)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000"); // 1.实例化一个Job Job job = Job.getInstance(conf, "six"); // 2.设置mapper类 job.setMapperClass(MyMaper.class); // 3.设置Combiner类 不是必须的 // job.setCombinerClass(MyReducer.class); // 4.设置Reducer类 job.setReducerClass(MyReducer.class); // 5.设置输出key的数据类型 job.setOutputKeyClass(Text.class); // 6.设置输出value的数据类型 job.setOutputValueClass(IntWritable.class); // 设置通过哪个类查找job的Jar包 job.setJarByClass(CountQueriesGreater2.class); // 7.设置输入路径 FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8")); // 8.设置输出路径 FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountQueriesGreater2")); // 9.执行该作业 job.waitForCompletion(true); System.out.println("查询次数大于2次的用户总数:" + total + "条"); } }

(7)查询次数大于2次的用户占比

package com.hadoop; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CountQueriesGreaterPro { /* * * InputFormat ->FileInputFormat(子类)-> TextInputFormat(子类) * 程序当中默认调用的就是TextInputFormat 1.验证数据路径是否合法 2.TextInputFormat默认读取数据,就是一行一行读取的 * * 开始执行map,一个map就是一个task(是一个Java进程,运行在JVM上的) * map执行完毕后,会执行combiner(可选项),对一个map中的重复单词进行合并,value是一个map中出现的相同单词次数。 * shuffle(partitioner分区,进行不同map的值并按key排序) * Reducer接收所有map数据,并将具有相同key的value值合并 * * OutputFormat ->FileOutputFormat(子类)->TextOutputFormat(子类) 1.验证数据路径是否合法 * 2.TextOutputFormat写入文件格式是 key + "\t" + value + "\n" */ // Text是一个能够写入文件的Java String数据类型 // IntWritable是能够写入文件的int数据类型 // 头两个参数表示的是输入数据key和value的数据类型 // 后两个数据表示的就是输出数据key和value的数据类型 public static int total1 = 0; public static int total2 = 0; public static class MyMaper extends Mapper { @Override // key是行地址 // value是一行字符串 protected void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException { total2++; String[] str = value.toString().split("\t"); Text word; IntWritable one = new IntWritable(1); word = new Text(str[1]); context.write(word, one); // 执行完毕后就是一个单词 对应一个value(1) } } // 头两个参数表示的是输入数据key和value的数据类型 // 后两个数据表示的就是输出数据key和value的数据类型 public static class MyReducer extends Reducer { @Override protected void reduce(Text arg0, Iterable arg1, Reducer.Context arg2) throws IOException, InterruptedException { // arg0是一个单词 arg1是对应的次数 int sum = 0; for (IntWritable i : arg1) { sum += i.get(); } if(sum>2){ total1++; } arg2.write(arg0, new IntWritable(sum)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { System.out.println("seven begin"); Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000"); // 1.实例化一个Job Job job = Job.getInstance(conf, "seven"); // 2.设置mapper类 job.setMapperClass(MyMaper.class); // 3.设置Combiner类 不是必须的 // job.setCombinerClass(MyReducer.class); // 4.设置Reducer类 job.setReducerClass(MyReducer.class); // 5.设置输出key的数据类型 job.setOutputKeyClass(Text.class); // 6.设置输出value的数据类型 job.setOutputValueClass(IntWritable.class); // 设置通过哪个类查找job的Jar包 job.setJarByClass(CountQueriesGreaterPro.class); // 7.设置输入路径 FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8")); // 8.设置输出路径 FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountQueriesGreaterPro")); // 9.执行该作业 job.waitForCompletion(true); System.out.println("total1="+total1+"\ttotal2="+total2); float percentage = (float)total1/(float)total2; System.out.println("查询次数大于2次的用户占比为:" + percentage*100+"%"); System.out.println("over"); } }

(8)Rank在10以内的点击次数占比

package com.hadoop; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CountRank { public static int sum1 = 0; public static int sum2 = 0; public static class MyMapper extends Mapper { @Override protected void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException { sum2++; String[] str = value.toString().split("\t"); int rank = Integer.parseInt(str[3]); if(rank3){ Str=Str+key.toString()+"\n"; i++; } } } public static void main(String[] args) { Configuration conf=new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000"); Job job = null; try { job = Job.getInstance(conf, "count"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } job.setJarByClass(CountUidGreater3.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); try { FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8")); } catch (IllegalArgumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountUidGreater3")); job.waitForCompletion(true); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("i: "+i); System.out.println(Str); } }

5.将4每步骤生成的结果保存到HDFS中。 在第四步中的每个HQL前面加上:Insert overwrite directory ‘位置’

eg:Insert overwrite directory '/sogou/data/1' Select count(*) from sougou;

6.将5生成的文件通过Java API方式导入到HBase(一张表)。

package com.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import java.io.IOException; public class HBaseImport{ // reduce输出的表名 private static String tableName = "test"; // 初始化连接 static Configuration conf = null; static { conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://192.168.136.100:9000/hbase"); conf.set("hbase.master", "hdfs://192.168.136.100:60000"); conf.set("hbase.zookeeper.property.clientPort", "2181"); conf.set("hbase.zookeeper.quorum", "master,slave1,slave2"); conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); } public static class BatchMapper extends Mapper { protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String line = value.toString(); Text v2s = new Text(); v2s.set(line); context.write(key, v2s); } } public static class BatchReducer extends TableReducer { private String family = "info"; @Override protected void reduce( LongWritable arg0, Iterable v2s, Reducer.Context context) throws IOException, InterruptedException { for (Text v2 : v2s) { String[] splited = v2.toString().split("\t"); String rowKey = splited[0]; Put put = new Put(rowKey.getBytes()); put.add(family.getBytes(), "raw".getBytes(), v2.toString() .getBytes()); context.write(NullWritable.get(), put); } } } public static void imputil(String str) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(conf, HBaseImport.class.getSimpleName()); TableMapReduceUtil.addDependencyJars(job); job.setJarByClass(HBaseImport.class); FileInputFormat.setInputPaths(job,str); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(BatchMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(BatchReducer.class); job.setOutputFormatClass(TableOutputFormat.class); job.waitForCompletion(true); } public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { String[] str={ "hdfs://192.168.136.100:9000/sogou/data/1/000000_0", "hdfs://192.168.136.100:9000/sogou/data/2/000000_0", "hdfs://192.168.136.100:9000/sogou/data/3/000000_0", "hdfs://192.168.136.100:9000/sogou/data/4/000000_0", "hdfs://192.168.136.100:9000/sogou/data/5/000000_0", "hdfs://192.168.136.100:9000/sogou/data/6/000000_0", "hdfs://192.168.136.100:9000/sogou/data/9/000000_0", "hdfs://192.168.136.100:9000/sogou/data/10/000000_0" }; for (String stri:str){ imputil(stri); } } }

7.通过HBase shell命令查询6导出的结果。 命令:scan ‘test’ 这里写图片描述



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有