package mapred; import java.io.*; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class Vertex2Block { static class Elem //Triple helps to parse { String id; String len; String bid; Elem(String id, String len, String bid) { this.id=id; this.len=len; this.bid=bid; } } //Read text file by line and generate pair public static class V2BMapper extends Mapper { //Input: vid bid wid\tvid1 len1 bid1 wid1 vid2 len2 bid2 wid2... private final static IntWritable bid = new IntWritable(); private final static Text txt = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString(), "\t "); String vid = itr.nextToken(); String b = itr.nextToken(); bid.set(Integer.parseInt(b)); itr.nextToken(); String x = itr.nextToken(); String y = itr.nextToken(); String z = itr.nextToken(); StringBuffer sbuf = new StringBuffer(); int count=0; while(itr.hasMoreTokens()){ Elem ele=new Elem(itr.nextToken(), itr.nextToken(), itr.nextToken()); sbuf.append(" "+ele.id+" "+ele.len+" "+ele.bid); itr.nextToken(); count++; } txt.set(vid+" "+x+" "+y+" "+z+" "+count+sbuf); context.write(bid, txt); } } public static class V2BReducer extends Reducer { private final static Text txt = new Text(); public void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException { StringBuffer sbuf = new StringBuffer(key.get()+"\t"); for(Text val:values) { sbuf.append(val.toString()); sbuf.append(" "); } txt.set(sbuf.toString()); context.write(NullWritable.get(), txt); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "Vertex2Block"); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Args: "); System.exit(2); } job.setJarByClass(Vertex2Block.class); //set mapper,key's type and value's type job.setMapperClass(V2BMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); //set reducer,key's type and value's type job.setReducerClass(V2BReducer.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(120); //set num of reducers manually //set input format: read text file by lines job.setInputFormatClass(TextInputFormat.class); //set input and output path (on HDFS) FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)?0:1); } }