2、对统计结果统计后,然后考虑排序(由于MapReduce框架中的suffle阶段 自动按照k进行的排序,故可以自定义对象,完成排序操作)


[hadoop@cloud01 ~]$ hadoop fs -cat /flow/output/part-r-00000 1380013800     180     200     380 可以自己模拟数据

[hadoop@cloud01 ~]$ hadoop fs -mkdir -p /flow/sort/input [hadoop@cloud01 ~]$ hadoop fs -mkdir -p /flow/sort/output [hadoop@cloud01 ~]$ hadoop fs -cp  /flow/output/part-r-00000 /flow/sort/input [hadoop@cloud01 ~]$ hadoop fs -cat /flow/sort/input/part-r-00000 1380013800     180     200     380


对hadoop fs -cat /flow/sort/input/part-r-00000进行排序

package serializable.sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class FlowBeans implements WritableComparable { private String mobileNumber ; private long upFlow ; private long downFlow ; public FlowBeans() { } public FlowBeans( long upFlow , long downFlow , String mobileNumber ) { this . mobileNumber = mobileNumber ; this . upFlow = upFlow ; this . downFlow = downFlow ; } /** * Serialize the fields of this object to out . * * @param out DataOuput to serialize this object into. * @throws IOException */ @Override public void write(DataOutput out ) throws IOException { out .writeUTF( mobileNumber ); out .writeLong( upFlow ); out .writeLong( downFlow ); } /** * Deserialize the fields of this object from in . * *

For efficiency, implementations should attempt to re - use storage in the * existing object where possible.

* * @param in DataInput to deseriablize this object from. * @throws IOException */ @Override public void readFields(DataInput in ) throws IOException { this . mobileNumber = in .readUTF(); this . upFlow = in .readLong(); this . downFlow = in .readLong(); } public String getMobileNumber() { return mobileNumber ; } public void setMobileNumber(String mobileNumber ) { this . mobileNumber = mobileNumber ; } public long getUpFlow() { return upFlow ; } public void setUpFlow( long upFlow ) { this . upFlow = upFlow ; } public long getDownFlow () { return downFlow ; } public void setDownFlow( long downFlow ) { this . downFlow = downFlow ; } @Override public String toString() { return this . mobileNumber + "\t" + this . upFlow + "\t" + this . downFlow + "\t" + ( this . upFlow + this . downFlow ); } @Override public int compareTo(FlowBeans o ) { long thisValue = this . downFlow + this . upFlow ; long thatValue = o . downFlow + o . upFlow ; return ( thisValue > thatValue ? -1 : ( thisValue == thatValue ? 0 : 1)); } } package serializable.sort; import java.io.IOException; import java.net.URI; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import serializable.FlowSumRunner; public class FlowSortRunner { private static final String HDFS_PATH = "hdfs://cloud01:9000" ; public static class FlowSortMapper extends Mapper { FlowBeans flowBeans = null ; @Override protected void map(LongWritable key , Text value , Context context ) throws IOException, InterruptedException { String[] values = StringUtils.split( value .toString(), "\t" ); String mobileNumber = values [0]; long upFlow = new Long( values [1]); long downFlow = new Long( values [2]); flowBeans = new FlowBeans( upFlow , downFlow , mobileNumber ); context .write( flowBeans , NullWritable.get ()); } } public static class FlowSortReducer extends Reducer{ @Override protected void reduce(FlowBeans k , Iterable values , Context context ) throws IOException, InterruptedException { context .write( k , NullWritable.get ()); } } public static void main(String[] args ) { Configuration conf = new Configuration(); try { Job job = Job.getInstance( conf ); job .setJarByClass( FlowSortRunner . class ); job .setJar( "flowSortJob.jar" ); job .setMapperClass(FlowSortMapper. class ); job .setReducerClass(FlowSortReducer. class ); job .setMapOutputKeyClass(FlowBeans. class ); job .setMapOutputValueClass(NullWritable. class ); job .setOutputKeyClass(FlowBeans. class ); job .setOutputKeyClass(NullWritable. class ); Path inputPath = new Path( HDFS_PATH + "/flow/sort/input" ); Path outputDir = new Path( HDFS_PATH + "/flow/sort/output" ); FileInputFormat. setInputPaths( job , inputPath ); FileOutputFormat. setOutputPath( job , outputDir ); FileSystem fs = FileSystem.get( new URI( HDFS_PATH ), conf ); if ( fs .exists( outputDir )) { fs .delete( outputDir , true ); } System. exit( job .waitForCompletion( true ) ? 0 : 1); } catch (Exception e ) { e .printStackTrace(); } } }


[hadoop@cloud01 ~]$ hadoop fs -cat /flow/sort/output/part-r-00000


[hadoop@cloud01 HDFSdemo]$ hadoop jar flowSortJob.jar serializable.sort.FlowSortRunner 15/02/25 21:09:54 INFO client.RMProxy: Connecting to ResourceManager at cloud01/ 15/02/25 21:09:55 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 15/02/25 21:09:55 INFO input.FileInputFormat: Total input paths to process : 1 15/02/25 21:09:55 INFO mapreduce.JobSubmitter: number of splits:1 15/02/25 21:09:55 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1424927370429_0001 15/02/25 21:09:56 INFO impl.YarnClientImpl: Submitted application application_1424927370429_0001 15/02/25 21:09:56 INFO mapreduce.Job: The url to track the job: http://cloud01:8088/proxy/application_1424927370429_0001/ 15/02/25 21:09:56 INFO mapreduce.Job: Running job: job_1424927370429_0001 15/02/25 21:10:04 INFO mapreduce.Job: Job job_1424927370429_0001 running in uber mode : false 15/02/25 21:10:04 INFO mapreduce.Job:  map 0% reduce 0% 15/02/25 21:10:09 INFO mapreduce.Job:  map 100% reduce 0% 15/02/25 21:10:15 INFO mapreduce.Job:  map 100% reduce 100% 15/02/25 21:10:15 INFO mapreduce.Job: Job job_1424927370429_0001 completed successfully 15/02/25 21:10:15 INFO mapreduce.Job: Counters: 49






