Class AvroMultipleOutputs

java.lang.Object
org.apache.avro.mapreduce.AvroMultipleOutputs

public class AvroMultipleOutputs extends Object
The AvroMultipleOutputs class simplifies writing Avro output data to multiple outputs

Case one: writing to additional outputs other than the job default output. Each additional output, or named output, may be configured with its own Schema and OutputFormat.

Case two: to write data to different files provided by user

AvroMultipleOutputs supports counters, by default they are disabled. The counters group is the AvroMultipleOutputs class name. The names of the counters are the same as the output name. These count the number of records written to each output name.

Usage pattern for job submission:

 Job job = Job.getInstance();

 FileInputFormat.setInputPath(job, inDir);
 FileOutputFormat.setOutputPath(job, outDir);

 job.setMapperClass(MyAvroMapper.class);
 job.setReducerClass(MyAvroReducer.class);
 ...

 Schema schema;
 ...
 // Defines additional single output 'avro1' for the job
 AvroMultipleOutputs.addNamedOutput(job, "avro1", AvroKeyValueOutputFormat.class,
 keyschema, valueSchema);  // valueSchema can be set to null if there only Key to be written
                                   to file in the RecordWriter

 // Defines additional output 'avro2' with different schema for the job
 AvroMultipleOutputs.addNamedOutput(job, "avro2",
   AvroKeyOutputFormat.class,
   schema,null);
 ...

 job.waitForCompletion(true);
 ...
 

Usage in Reducer:


 public class MyAvroReducer extends
   Reducer<K, V, T, NullWritable> {
 private MultipleOutputs amos;


 public void setup(Context context) {
 ...
 amos = new AvroMultipleOutputs(context);
 }

 public void reduce(K, Iterator<V> values,Context context)
 throws IOException {
 ...
 amos.write("avro1",datum,NullWritable.get());
 amos.write("avro2",datum,NullWritable.get());
 amos.getCollector("avro3",datum); // here the value is taken as NullWritable
 ...
 }

 public void cleanup(Context context) throws IOException {
 amos.close();
 ...
 }

 }
 
  • Constructor Details

    • AvroMultipleOutputs

      public AvroMultipleOutputs(TaskInputOutputContext<?,?,?,?> context)
      Creates and initializes multiple outputs support, it should be instantiated in the Mapper/Reducer setup method.
      Parameters:
      context - the TaskInputOutputContext object
  • Method Details

    • addNamedOutput

      public static void addNamedOutput(Job job, String namedOutput, Class<? extends OutputFormat> outputFormatClass, Schema keySchema)
      Adds a named output for the job.

      Parameters:
      job - job to add the named output
      namedOutput - named output name, it has to be a word, letters and numbers only, cannot be the word 'part' as that is reserved for the default output.
      outputFormatClass - OutputFormat class.
      keySchema - Schema for the Key
    • addNamedOutput

      public static void addNamedOutput(Job job, String namedOutput, Class<? extends OutputFormat> outputFormatClass, Schema keySchema, Schema valueSchema)
      Adds a named output for the job.

      Parameters:
      job - job to add the named output
      namedOutput - named output name, it has to be a word, letters and numbers only, cannot be the word 'part' as that is reserved for the default output.
      outputFormatClass - OutputFormat class.
      keySchema - Schema for the Key
      valueSchema - Schema for the Value (used in case of AvroKeyValueOutputFormat or null)
    • setCountersEnabled

      public static void setCountersEnabled(Job job, boolean enabled)
      Enables or disables counters for the named outputs. The counters group is the AvroMultipleOutputs class name. The names of the counters are the same as the named outputs. These counters count the number records written to each output name. By default these counters are disabled.
      Parameters:
      job - job to enable counters
      enabled - indicates if the counters will be enabled or not.
    • getCountersEnabled

      public static boolean getCountersEnabled(JobContext job)
      Returns if the counters for the named outputs are enabled or not. By default these counters are disabled.
      Parameters:
      job - the job
      Returns:
      TRUE if the counters are enabled, FALSE if they are disabled.
    • write

      public void write(String namedOutput, Object key) throws IOException, InterruptedException
      Write key and value to the namedOutput. Output path is a unique file generated for the namedOutput. For example, {namedOutput}-(m|r)-{part-number}
      Parameters:
      namedOutput - the named output name
      key - the key , value is NullWritable
      Throws:
      IOException
      InterruptedException
    • write

      public void write(String namedOutput, Object key, Object value) throws IOException, InterruptedException
      Write key and value to the namedOutput. Output path is a unique file generated for the namedOutput. For example, {namedOutput}-(m|r)-{part-number}
      Parameters:
      namedOutput - the named output name
      key - the key
      value - the value
      Throws:
      IOException
      InterruptedException
    • write

      public void write(String namedOutput, Object key, Object value, String baseOutputPath) throws IOException, InterruptedException
      Write key and value to baseOutputPath using the namedOutput.
      Parameters:
      namedOutput - the named output name
      key - the key
      value - the value
      baseOutputPath - base-output path to write the record to. Note: Framework will generate unique filename for the baseOutputPath
      Throws:
      IOException
      InterruptedException
    • write

      public void write(Object key, Object value, String baseOutputPath) throws IOException, InterruptedException
      Write key value to an output file name. Gets the record writer from job's output format. Job's output format should be a FileOutputFormat.
      Parameters:
      key - the key
      value - the value
      baseOutputPath - base-output path to write the record to. Note: Framework will generate unique filename for the baseOutputPath
      Throws:
      IOException
      InterruptedException
    • write

      public void write(Object key, Object value, Schema keySchema, Schema valSchema, String baseOutputPath) throws IOException, InterruptedException
      Write key value to an output file name. Gets the record writer from job's output format. Job's output format should be a FileOutputFormat.
      Parameters:
      key - the key
      value - the value
      keySchema - keySchema to use
      valSchema - ValueSchema to use
      baseOutputPath - base-output path to write the record to. Note: Framework will generate unique filename for the baseOutputPath
      Throws:
      IOException
      InterruptedException
    • sync

      public long sync(String namedOutput, String baseOutputPath) throws IOException, InterruptedException
      Gets the record writer from job's output format. Job's output format should be a FileOutputFormat.If the record writer implements Syncable then returns the current position as a value that may be passed to DataFileReader.seek(long) otherwise returns -1. Forces the end of the current block, emitting a synchronization marker.
      Parameters:
      namedOutput - the namedOutput
      baseOutputPath - base-output path to write the record to. Note: Framework will generate unique filename for the baseOutputPath
      Throws:
      IOException
      InterruptedException
    • close

      public void close() throws IOException, InterruptedException
      Closes all the opened outputs. This should be called from cleanup method of map/reduce task. If overridden subclasses must invoke super.close() at the end of their close()
      Throws:
      IOException
      InterruptedException