Package org.apache.avro.mapreduce
Class AvroMultipleOutputs
java.lang.Object
org.apache.avro.mapreduce.AvroMultipleOutputs
The AvroMultipleOutputs class simplifies writing Avro output data to multiple
outputs
Case one: writing to additional outputs other than the job default output.
Each additional output, or named output, may be configured with its own
Schema
and OutputFormat
.
Case two: to write data to different files provided by user
AvroMultipleOutputs supports counters, by default they are disabled. The
counters group is the AvroMultipleOutputs
class name. The names of
the counters are the same as the output name. These count the number of
records written to each output name.
Job job = Job.getInstance(); FileInputFormat.setInputPath(job, inDir); FileOutputFormat.setOutputPath(job, outDir); job.setMapperClass(MyAvroMapper.class); job.setReducerClass(MyAvroReducer.class); ... Schema schema; ... // Defines additional single output 'avro1' for the job AvroMultipleOutputs.addNamedOutput(job, "avro1", AvroKeyValueOutputFormat.class, keyschema, valueSchema); // valueSchema can be set to null if there only Key to be written to file in the RecordWriter // Defines additional output 'avro2' with different schema for the job AvroMultipleOutputs.addNamedOutput(job, "avro2", AvroKeyOutputFormat.class, schema,null); ... job.waitForCompletion(true); ...
Usage in Reducer:
public class MyAvroReducer extends Reducer<K, V, T, NullWritable> { private MultipleOutputs amos; public void setup(Context context) { ... amos = new AvroMultipleOutputs(context); } public void reduce(K, Iterator<V> values,Context context) throws IOException { ... amos.write("avro1",datum,NullWritable.get()); amos.write("avro2",datum,NullWritable.get()); amos.getCollector("avro3",datum); // here the value is taken as NullWritable ... } public void cleanup(Context context) throws IOException { amos.close(); ... } }
-
Constructor Summary
ConstructorDescriptionAvroMultipleOutputs
(TaskInputOutputContext<?, ?, ?, ?> context) Creates and initializes multiple outputs support, it should be instantiated in the Mapper/Reducer setup method. -
Method Summary
Modifier and TypeMethodDescriptionstatic void
addNamedOutput
(Job job, String namedOutput, Class<? extends OutputFormat> outputFormatClass, Schema keySchema) Adds a named output for the job.static void
addNamedOutput
(Job job, String namedOutput, Class<? extends OutputFormat> outputFormatClass, Schema keySchema, Schema valueSchema) Adds a named output for the job.void
close()
Closes all the opened outputs.static boolean
Returns if the counters for the named outputs are enabled or not.static void
setCountersEnabled
(Job job, boolean enabled) Enables or disables counters for the named outputs.long
Gets the record writer from job's output format.void
Write key value to an output file name.void
Write key value to an output file name.void
Write key and value to the namedOutput.void
Write key and value to the namedOutput.void
Write key and value to baseOutputPath using the namedOutput.
-
Constructor Details
-
AvroMultipleOutputs
Creates and initializes multiple outputs support, it should be instantiated in the Mapper/Reducer setup method.- Parameters:
context
- the TaskInputOutputContext object
-
-
Method Details
-
addNamedOutput
public static void addNamedOutput(Job job, String namedOutput, Class<? extends OutputFormat> outputFormatClass, Schema keySchema) Adds a named output for the job.- Parameters:
job
- job to add the named outputnamedOutput
- named output name, it has to be a word, letters and numbers only, cannot be the word 'part' as that is reserved for the default output.outputFormatClass
- OutputFormat class.keySchema
- Schema for the Key
-
addNamedOutput
public static void addNamedOutput(Job job, String namedOutput, Class<? extends OutputFormat> outputFormatClass, Schema keySchema, Schema valueSchema) Adds a named output for the job.- Parameters:
job
- job to add the named outputnamedOutput
- named output name, it has to be a word, letters and numbers only, cannot be the word 'part' as that is reserved for the default output.outputFormatClass
- OutputFormat class.keySchema
- Schema for the KeyvalueSchema
- Schema for the Value (used in case of AvroKeyValueOutputFormat or null)
-
setCountersEnabled
Enables or disables counters for the named outputs. The counters group is theAvroMultipleOutputs
class name. The names of the counters are the same as the named outputs. These counters count the number records written to each output name. By default these counters are disabled.- Parameters:
job
- job to enable countersenabled
- indicates if the counters will be enabled or not.
-
getCountersEnabled
Returns if the counters for the named outputs are enabled or not. By default these counters are disabled.- Parameters:
job
- the job- Returns:
- TRUE if the counters are enabled, FALSE if they are disabled.
-
write
Write key and value to the namedOutput. Output path is a unique file generated for the namedOutput. For example, {namedOutput}-(m|r)-{part-number}- Parameters:
namedOutput
- the named output namekey
- the key , value is NullWritable- Throws:
IOException
InterruptedException
-
write
public void write(String namedOutput, Object key, Object value) throws IOException, InterruptedException Write key and value to the namedOutput. Output path is a unique file generated for the namedOutput. For example, {namedOutput}-(m|r)-{part-number}- Parameters:
namedOutput
- the named output namekey
- the keyvalue
- the value- Throws:
IOException
InterruptedException
-
write
public void write(String namedOutput, Object key, Object value, String baseOutputPath) throws IOException, InterruptedException Write key and value to baseOutputPath using the namedOutput.- Parameters:
namedOutput
- the named output namekey
- the keyvalue
- the valuebaseOutputPath
- base-output path to write the record to. Note: Framework will generate unique filename for the baseOutputPath- Throws:
IOException
InterruptedException
-
write
public void write(Object key, Object value, String baseOutputPath) throws IOException, InterruptedException Write key value to an output file name. Gets the record writer from job's output format. Job's output format should be a FileOutputFormat.- Parameters:
key
- the keyvalue
- the valuebaseOutputPath
- base-output path to write the record to. Note: Framework will generate unique filename for the baseOutputPath- Throws:
IOException
InterruptedException
-
write
public void write(Object key, Object value, Schema keySchema, Schema valSchema, String baseOutputPath) throws IOException, InterruptedException Write key value to an output file name. Gets the record writer from job's output format. Job's output format should be a FileOutputFormat.- Parameters:
key
- the keyvalue
- the valuekeySchema
- keySchema to usevalSchema
- ValueSchema to usebaseOutputPath
- base-output path to write the record to. Note: Framework will generate unique filename for the baseOutputPath- Throws:
IOException
InterruptedException
-
sync
public long sync(String namedOutput, String baseOutputPath) throws IOException, InterruptedException Gets the record writer from job's output format. Job's output format should be a FileOutputFormat.If the record writer implements Syncable then returns the current position as a value that may be passed to DataFileReader.seek(long) otherwise returns -1. Forces the end of the current block, emitting a synchronization marker.- Parameters:
namedOutput
- the namedOutputbaseOutputPath
- base-output path to write the record to. Note: Framework will generate unique filename for the baseOutputPath- Throws:
IOException
InterruptedException
-
close
Closes all the opened outputs. This should be called from cleanup method of map/reduce task. If overridden subclasses must invokesuper.close()
at the end of theirclose()
- Throws:
IOException
InterruptedException
-