public class AvroMultipleOutputs extends Object
Case one: writing to additional outputs other than the job default output.
Each additional output, or named output, may be configured with its own
Schema
and OutputFormat
.
Case two: to write data to different files provided by user
AvroMultipleOutputs supports counters, by default they are disabled. The
counters group is the AvroMultipleOutputs
class name. The names of the
counters are the same as the output name. These count the number of records
written to each output name.
Job job = new Job(); FileInputFormat.setInputPath(job, inDir); FileOutputFormat.setOutputPath(job, outDir); job.setMapperClass(MyAvroMapper.class); job.setReducerClass(MyAvroReducer.class); ... Schema schema; ... // Defines additional single output 'avro1' for the job AvroMultipleOutputs.addNamedOutput(job, "avro1", AvroKeyValueOutputFormat.class, keyschema, valueSchema); // valueSchema can be set to null if there only Key to be written to file in the RecordWriter // Defines additional output 'avro2' with different schema for the job AvroMultipleOutputs.addNamedOutput(job, "avro2", AvroKeyOutputFormat.class, schema,null); ... job.waitForCompletion(true); ...
Usage in Reducer:
public class MyAvroReducer extends Reducer<K, V, T, NullWritable> { private MultipleOutputs amos; public void setup(Context context) { ... amos = new AvroMultipleOutputs(context); } public void reduce(K, Iterator<V> values,Context context) throws IOException { ... amos.write("avro1",datum,NullWritable.get()); amos.write("avro2",datum,NullWritable.get()); amos.getCollector("avro3",datum); // here the value is taken as NullWritable ... } public void cleanup(Context context) throws IOException { amos.close(); ... } }
Constructor and Description |
---|
AvroMultipleOutputs(TaskInputOutputContext<?,?,?,?> context)
Creates and initializes multiple outputs support,
it should be instantiated in the Mapper/Reducer setup method.
|
Modifier and Type | Method and Description |
---|---|
static void |
addNamedOutput(Job job,
String namedOutput,
Class<? extends OutputFormat> outputFormatClass,
Schema keySchema)
Adds a named output for the job.
|
static void |
addNamedOutput(Job job,
String namedOutput,
Class<? extends OutputFormat> outputFormatClass,
Schema keySchema,
Schema valueSchema)
Adds a named output for the job.
|
void |
close()
Closes all the opened outputs.
|
static boolean |
getCountersEnabled(JobContext job)
Returns if the counters for the named outputs are enabled or not.
|
static void |
setCountersEnabled(Job job,
boolean enabled)
Enables or disables counters for the named outputs.
|
long |
sync(String namedOutput,
String baseOutputPath)
Gets the record writer from job's output format.
|
void |
write(Object key,
Object value,
Schema keySchema,
Schema valSchema,
String baseOutputPath)
Write key value to an output file name.
|
void |
write(Object key,
Object value,
String baseOutputPath)
Write key value to an output file name.
|
void |
write(String namedOutput,
Object key)
Write key and value to the namedOutput.
|
void |
write(String namedOutput,
Object key,
Object value)
Write key and value to the namedOutput.
|
void |
write(String namedOutput,
Object key,
Object value,
String baseOutputPath)
Write key and value to baseOutputPath using the namedOutput.
|
public AvroMultipleOutputs(TaskInputOutputContext<?,?,?,?> context)
context
- the TaskInputOutputContext objectpublic static void addNamedOutput(Job job, String namedOutput, Class<? extends OutputFormat> outputFormatClass, Schema keySchema)
job
- job to add the named outputnamedOutput
- named output name, it has to be a word, letters
and numbers only, cannot be the word 'part' as
that is reserved for the default output.outputFormatClass
- OutputFormat class.keySchema
- Schema for the Keypublic static void addNamedOutput(Job job, String namedOutput, Class<? extends OutputFormat> outputFormatClass, Schema keySchema, Schema valueSchema)
job
- job to add the named outputnamedOutput
- named output name, it has to be a word, letters
and numbers only, cannot be the word 'part' as
that is reserved for the default output.outputFormatClass
- OutputFormat class.keySchema
- Schema for the KeyvalueSchema
- Schema for the Value (used in case of AvroKeyValueOutputFormat or null)public static void setCountersEnabled(Job job, boolean enabled)
AvroMultipleOutputs
class name.
The names of the counters are the same as the named outputs. These
counters count the number records written to each output name.
By default these counters are disabled.job
- job to enable countersenabled
- indicates if the counters will be enabled or not.public static boolean getCountersEnabled(JobContext job)
job
- the jobpublic void write(String namedOutput, Object key) throws IOException, InterruptedException
namedOutput
- the named output namekey
- the key , value is NullWritableIOException
InterruptedException
public void write(String namedOutput, Object key, Object value) throws IOException, InterruptedException
namedOutput
- the named output namekey
- the keyvalue
- the valueIOException
InterruptedException
public void write(String namedOutput, Object key, Object value, String baseOutputPath) throws IOException, InterruptedException
namedOutput
- the named output namekey
- the keyvalue
- the valuebaseOutputPath
- base-output path to write the record to.
Note: Framework will generate unique filename for the baseOutputPathIOException
InterruptedException
public void write(Object key, Object value, String baseOutputPath) throws IOException, InterruptedException
key
- the keyvalue
- the valuebaseOutputPath
- base-output path to write the record to.
Note: Framework will generate unique filename for the baseOutputPathIOException
InterruptedException
public void write(Object key, Object value, Schema keySchema, Schema valSchema, String baseOutputPath) throws IOException, InterruptedException
key
- the keyvalue
- the valuekeySchema
- keySchema to usevalSchema
- ValueSchema to usebaseOutputPath
- base-output path to write the record to. Note: Framework will
generate unique filename for the baseOutputPathIOException
InterruptedException
public long sync(String namedOutput, String baseOutputPath) throws IOException, InterruptedException
namedOutput
- the namedOutputbaseOutputPath
- base-output path to write the record to. Note: Framework will
generate unique filename for the baseOutputPathIOException
InterruptedException
public void close() throws IOException, InterruptedException
super.close()
at the
end of their close()
IOException
InterruptedException
Copyright © 2009–2016 The Apache Software Foundation. All rights reserved.