Mulitple outputs can be used with both mapper and reducers.
But, if the mapper output is used with mulitple outputs, then you will have regular parts file with no output, and new file with your custom path.
package com.srini.test
public class DecryptorMapReduce extends Configured implements Tool {
static class DecryptionMapper extends Mapper<Text, Text, Text, NullWritable>
{
MultipleOutputs<Text, NullWritable> multipleOutputs;
# Initializing the multiple outputs
protected void setup(Context context) throws IOException, InterruptedException {
multipleOutputs = new MultipleOutputs<Text, NullWritable>(context);
}
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException
{
// Write using mulitpleOutputs instead of context.write
multipleOutputs.write(new Text(Key), NullWritable.get(),"Output");
}
@Override
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setMapperClass(DecryptionMapper.class);
job.setInputFormatClass(TextInputFormat.class);
job.setJarByClass(DecryptorMapReduce.class);
FileInputFormat.setInputPathFilter(job, TransporterPathFilter.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileInputFormat.setInputDirRecursive(job, true);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setNumReduceTasks(0);
job.waitForCompletion(true);
return 0;
}
public static void main(String... args) throws Exception
{
int res = ToolRunner.run(new DecryptorMapReduce(), args);
System.exit(res);
}
}
But, if the mapper output is used with mulitple outputs, then you will have regular parts file with no output, and new file with your custom path.
package com.srini.test
public class DecryptorMapReduce extends Configured implements Tool {
static class DecryptionMapper extends Mapper<Text, Text, Text, NullWritable>
{
MultipleOutputs<Text, NullWritable> multipleOutputs;
# Initializing the multiple outputs
protected void setup(Context context) throws IOException, InterruptedException {
multipleOutputs = new MultipleOutputs<Text, NullWritable>(context);
}
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException
{
// Write using mulitpleOutputs instead of context.write
multipleOutputs.write(new Text(Key), NullWritable.get(),"Output");
}
@Override
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setMapperClass(DecryptionMapper.class);
job.setInputFormatClass(TextInputFormat.class);
job.setJarByClass(DecryptorMapReduce.class);
FileInputFormat.setInputPathFilter(job, TransporterPathFilter.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileInputFormat.setInputDirRecursive(job, true);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setNumReduceTasks(0);
job.waitForCompletion(true);
return 0;
}
public static void main(String... args) throws Exception
{
int res = ToolRunner.run(new DecryptorMapReduce(), args);
System.exit(res);
}
}
No comments:
Post a Comment