博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce实现排序功能
阅读量:7112 次
发布时间:2019-06-28

本文共 5754 字,大约阅读时间需要 19 分钟。

期间遇到了无法转value的值为int型,我採用try catch解决

str2 2

str1 1
str3 3
str1 4
str4 7
str2 5
str3 9

用的\t隔开,得到结果 

str1 1,4 

str2 2,5

str3 3,9

str4 7

我这里map,reduce都是单独出来的类,用了自己定义的key

package com.kane.mr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
import com.j_spaces.obf.fi;
//str2 2
//str1 1
//str3 3
//str1 4
//str4 7
//str2 5
//str3 9
public class IntPair implements WritableComparable<IntPair>{
public String getFirstKey() {
return firstKey;
}
public void setFirstKey(String firstKey) {
this.firstKey = firstKey;
}
public int getSecondKey() {
return secondKey;
}
public void setSecondKey(int secondKey) {
this.secondKey = secondKey;
}
private String firstKey;//str1
private int secondKey;//1
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(firstKey);
out.writeInt(secondKey);
}
@Override
public void readFields(DataInput in) throws IOException {
firstKey=in.readUTF();
secondKey=in.readInt();
}
//这里做比較,还有一个是自身本类,对key进行排序
@Override
public int compareTo(IntPair o) {
// int first=o.getFirstKey().compareTo(this.firstKey);
// if (first!=0) {
// return first;
// }
// else {
// return o.getSecondKey()-this.secondKey;
// }
return o.getFirstKey().compareTo(this.getFirstKey());
}
}

package com.kane.mr;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SortMapper extends Mapper<Object,Text,IntPair,IntWritable>{
public IntPair intPair=new IntPair();
public IntWritable intWritable=new IntWritable(0);
@Override
protected void map(Object key, Text value,//str1 1
Context context)
throws IOException, InterruptedException {
//String[] values=value.toString().split("/t");
System.out.println(value);
int intValue;
try {
intValue = Integer.parseInt(value.toString());
} catch (NumberFormatException e) {
intValue=6;
}//不加try catch总是读取value时,无法转成int型
intPair.setFirstKey(key.toString());
intPair.setSecondKey(intValue);
intWritable.set(intValue);
context.write(intPair, intWritable);// key(str2 2) 2
}
}

package com.kane.mr;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SortReducer extends Reducer<IntPair, IntWritable, Text,Text>{
@Override
protected void reduce(IntPair key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
StringBuffer combineValue=new StringBuffer();
Iterator<IntWritable> itr=values.iterator();
while (itr.hasNext()) {
int value=itr.next().get();
combineValue.append(value+",");
}
context.write(new Text(key.getFirstKey()),new Text(combineValue.toString()));
}
}

package com.kane.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class PartionTest extends Partitioner<IntPair, IntWritable>{
@Override
public int getPartition(IntPair key, IntWritable value, int numPartitions) {//reduce个数
return (key.getFirstKey().hashCode()&Integer.MAX_VALUE%numPartitions);
}
}

package com.kane.mr;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class TextComparator extends WritableComparator{
public TextComparator(){
super(IntPair.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
IntPair o1=(IntPair)a;
IntPair o2=(IntPair)b;
return o1.getFirstKey().compareTo(o2.getFirstKey());
}
}

package com.kane.mr;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
@SuppressWarnings("rawtypes")
public class TextIntCompartor extends WritableComparator{
protected TextIntCompartor() {
super(IntPair.class,true);
}
@Override
public int compare(WritableComparable a,  WritableComparable b) {
IntPair o1=(IntPair)a;
IntPair o2=(IntPair)b;
int first=o1.getFirstKey().compareTo(o2.getFirstKey());
if (first!=0) {
return first;
}
else {
return o1.getSecondKey()-o2.getSecondKey();
}
}
}

package com.kane.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class SortMain {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
   String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
   if (otherArgs.length != 2) {
     System.err.println("Usage: wordcount <in> <out>");
     System.exit(2);
   }
   Job job = new Job(conf, "Sort");
   job.setJarByClass(SortMain.class);
   job.setInputFormatClass(KeyValueTextInputFormat.class);//设定输入的格式是key(中间\t隔开)value
   job.setMapperClass(SortMapper.class);
   //job.setCombinerClass(IntSumReducer.class);
   job.setReducerClass(SortReducer.class);
   
   job.setMapOutputKeyClass(IntPair.class);
   job.setMapOutputValueClass(IntWritable.class);
   
   job.setSortComparatorClass(TextIntCompartor.class);
   job.setGroupingComparatorClass(TextComparator.class);//以key 进行group by
   job.setPartitionerClass(PartionTest.class);
   
   job.setOutputKeyClass(Text.class);
   job.setOutputValueClass(Text.class);
   
   FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//输入參数,相应hadoop jar 相应类执行时在后面加的第一个參数
   FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//输出參数
   System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

导出jar包放到hadoop下,然后讲sort.txt放入到hdfs中,然后用hadoop jar KaneTest/sort.jar com.kane.mr.SoetMain /kane/sort.txt /kane/output命令运行

你可能感兴趣的文章
【JavaScript】JavaScript Array 对象(数组)
查看>>
github 上有趣又实用的前端项目(持续更新,欢迎补充)
查看>>
opencv python 直方图均衡化
查看>>
HotFrameLearning 热门框架学习(前言)
查看>>
git团队开发流程
查看>>
【Under-the-hood-ReactJS-Part6】React源码解读
查看>>
深入理解css之vertical-align
查看>>
Laravel事件
查看>>
matlab绘制peano(皮亚诺)曲线和koch(科赫曲线,雪花曲线)分形曲线
查看>>
使用pipenv代替virtualenv管理python包
查看>>
Docker零基础入门指南(四):Docker容器使用
查看>>
React 深入系列4:组件的生命周期
查看>>
Mybatis之设计模式之迭代器模式
查看>>
房间号生成器
查看>>
CentOS 6.8 安装vsftpd
查看>>
js设计模式 --- 装饰设计模式
查看>>
Flask源代码阅读笔记(一)——应用启动
查看>>
IOS精品源码,仿探探UIButton封装iOS提示弹框迅速引导页自定义导航栏
查看>>
setState的一个Synthetic Event Warning
查看>>
通读Python官方文档之wsgiref(未完成)
查看>>