期间遇到了无法转value的值为int型,我採用try catch解决
str2 2
str1 1 str3 3 str1 4 str4 7 str2 5 str3 9用的\t隔开,得到结果
str1 1,4
str2 2,5
str3 3,9
str4 7
我这里map,reduce都是单独出来的类,用了自己定义的key
package com.kane.mr;
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; import com.j_spaces.obf.fi; //str2 2 //str1 1 //str3 3 //str1 4 //str4 7 //str2 5 //str3 9 public class IntPair implements WritableComparable<IntPair>{ public String getFirstKey() { return firstKey;}public void setFirstKey(String firstKey) { this.firstKey = firstKey;}public int getSecondKey() { return secondKey;}public void setSecondKey(int secondKey) { this.secondKey = secondKey;}private String firstKey;//str1private int secondKey;//1@Overridepublic void write(DataOutput out) throws IOException { out.writeUTF(firstKey);out.writeInt(secondKey);}@Overridepublic void readFields(DataInput in) throws IOException { firstKey=in.readUTF();secondKey=in.readInt();}//这里做比較,还有一个是自身本类,对key进行排序@Overridepublic int compareTo(IntPair o) { // int first=o.getFirstKey().compareTo(this.firstKey); // if (first!=0) { // return first; // } // else { // return o.getSecondKey()-this.secondKey; // }return o.getFirstKey().compareTo(this.getFirstKey());} }package com.kane.mr;
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class SortMapper extends Mapper<Object,Text,IntPair,IntWritable>{ public IntPair intPair=new IntPair();public IntWritable intWritable=new IntWritable(0);@Overrideprotected void map(Object key, Text value,//str1 1Context context)throws IOException, InterruptedException { //String[] values=value.toString().split("/t");System.out.println(value);int intValue;try { intValue = Integer.parseInt(value.toString());} catch (NumberFormatException e) { intValue=6;}//不加try catch总是读取value时,无法转成int型intPair.setFirstKey(key.toString());intPair.setSecondKey(intValue);intWritable.set(intValue);context.write(intPair, intWritable);// key(str2 2) 2} }package com.kane.mr;
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SortReducer extends Reducer<IntPair, IntWritable, Text,Text>{ @Overrideprotected void reduce(IntPair key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException { StringBuffer combineValue=new StringBuffer();Iterator<IntWritable> itr=values.iterator();while (itr.hasNext()) { int value=itr.next().get();combineValue.append(value+",");}context.write(new Text(key.getFirstKey()),new Text(combineValue.toString()));} }package com.kane.mr;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; public class PartionTest extends Partitioner<IntPair, IntWritable>{ @Overridepublic int getPartition(IntPair key, IntWritable value, int numPartitions) {//reduce个数return (key.getFirstKey().hashCode()&Integer.MAX_VALUE%numPartitions);} }package com.kane.mr;
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class TextComparator extends WritableComparator{ public TextComparator(){ super(IntPair.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) { IntPair o1=(IntPair)a;IntPair o2=(IntPair)b;return o1.getFirstKey().compareTo(o2.getFirstKey());} }package com.kane.mr;
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; @SuppressWarnings("rawtypes") public class TextIntCompartor extends WritableComparator{ protected TextIntCompartor() { super(IntPair.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) { IntPair o1=(IntPair)a;IntPair o2=(IntPair)b;int first=o1.getFirstKey().compareTo(o2.getFirstKey());if (first!=0) { return first;}else { return o1.getSecondKey()-o2.getSecondKey();}} }package com.kane.mr;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class SortMain { public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "Sort"); job.setJarByClass(SortMain.class); job.setInputFormatClass(KeyValueTextInputFormat.class);//设定输入的格式是key(中间\t隔开)value job.setMapperClass(SortMapper.class); //job.setCombinerClass(IntSumReducer.class); job.setReducerClass(SortReducer.class); job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(IntWritable.class); job.setSortComparatorClass(TextIntCompartor.class); job.setGroupingComparatorClass(TextComparator.class);//以key 进行group by job.setPartitionerClass(PartionTest.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//输入參数,相应hadoop jar 相应类执行时在后面加的第一个參数 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//输出參数 System.exit(job.waitForCompletion(true) ? 0 : 1);} }导出jar包放到hadoop下,然后讲sort.txt放入到hdfs中,然后用hadoop jar KaneTest/sort.jar com.kane.mr.SoetMain /kane/sort.txt /kane/output命令运行