Friday 10 July 2015

writing output of mapper/reducer using mulitpleoutputs

 Mulitple outputs can be used with both mapper and reducers.
But, if the mapper output is used with mulitple outputs, then you will have regular parts file with no output, and new file with your custom path.


package com.srini.test

public class DecryptorMapReduce extends Configured implements Tool {
   
    static class DecryptionMapper extends Mapper<Text, Text, Text, NullWritable>
    {
       
        MultipleOutputs<Text, NullWritable> multipleOutputs;
       
        # Initializing the multiple outputs
        protected void setup(Context context) throws IOException, InterruptedException {
                multipleOutputs = new MultipleOutputs<Text, NullWritable>(context);
            }
       
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException
        {
               // Write using mulitpleOutputs instead of context.write
                multipleOutputs.write(new Text(Key), NullWritable.get(),"Output");
         }
       
        @Override
        public void run(Context context) throws IOException, InterruptedException {
            setup(context);
            try {
              while (context.nextKeyValue()) {
                  map(context.getCurrentKey(), context.getCurrentValue(), context);
              }
            } finally {
              cleanup(context);
            }
          }
    }

    public int run(String[] args) throws Exception {
         Configuration conf = new Configuration();       
         Job job = Job.getInstance(conf);
       
         job.setMapperClass(DecryptionMapper.class);
         job.setInputFormatClass(TextInputFormat.class);
         job.setJarByClass(DecryptorMapReduce.class);
         FileInputFormat.setInputPathFilter(job, TransporterPathFilter.class);
         FileInputFormat.addInputPath(job, new Path(args[0]));
         FileInputFormat.setInputDirRecursive(job, true);
         job.setOutputFormatClass(TextOutputFormat.class);
         FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
         job.setNumReduceTasks(0);
         job.waitForCompletion(true);
         return 0;
    }
   
    public static void main(String... args) throws Exception
    {
        int res = ToolRunner.run(new DecryptorMapReduce(), args);
        System.exit(res);
    }
       
}

No comments:

Post a Comment