Class AvroMultipleOutputs

java.lang.Object
org.apache.avro.mapred.AvroMultipleOutputs

public class AvroMultipleOutputs extends Object
The AvroMultipleOutputs class simplifies writing Avro output data to multiple outputs

Case one: writing to additional outputs other than the job default output. Each additional output, or named output, may be configured with its own Schema and OutputFormat. A named output can be a single file or a multi file. The later is refered as a multi named output which is an unbound set of files all sharing the same Schema.

Case two: to write data to different files provided by user

AvroMultipleOutputs supports counters, by default they are disabled. The counters group is the AvroMultipleOutputs class name. The names of the counters are the same as the output name. These count the number of records written to each output name. For multi named outputs the name of the counter is the concatenation of the named output, and underscore '_' and the multiname.

Usage pattern for job submission:

 JobConf job = new JobConf();

 FileInputFormat.setInputPath(job, inDir);
 FileOutputFormat.setOutputPath(job, outDir);

 job.setMapperClass(MyAvroMapper.class);
 job.setReducerClass(HadoopReducer.class);
 job.set("avro.reducer",MyAvroReducer.class);
 ...

 Schema schema;
 ...
 // Defines additional single output 'avro1' for the job
 AvroMultipleOutputs.addNamedOutput(job, "avro1", AvroOutputFormat.class,
 schema);

 // Defines additional output 'avro2' with different schema for the job
 AvroMultipleOutputs.addNamedOutput(job, "avro2",
   AvroOutputFormat.class,
   null); // if Schema is specified as null then the default output schema is used
 ...

 job.waitForCompletion(true);
 ...
 

Usage in Reducer:


 public class MyAvroReducer extends
   AvroReducer<K, V, OUT> {
 private MultipleOutputs amos;


 public void configure(JobConf conf) {
 ...
 amos = new AvroMultipleOutputs(conf);
 }

 public void reduce(K, Iterator<V> values,
 AvroCollector<OUT>, Reporter reporter)
 throws IOException {
 ...
 amos.collect("avro1", reporter,datum);
 amos.getCollector("avro2", "A", reporter).collect(datum);
 amos.collect("avro1",reporter,schema,datum,"testavrofile");// this create a file testavrofile and writes data with schema "schema" into it
                                                            and uses other values from namedoutput "avro1" like outputclass etc.
 amos.collect("avro1",reporter,schema,datum,"testavrofile1");
 ...
 }

 public void close() throws IOException {
 amos.close();
 ...
 }

 }
 
  • Constructor Details

    • AvroMultipleOutputs

      public AvroMultipleOutputs(JobConf job)
      Creates and initializes multiple named outputs support, it should be instantiated in the Mapper/Reducer configure method.
      Parameters:
      job - the job configuration object
  • Method Details

    • getNamedOutputsList

      public static List<String> getNamedOutputsList(JobConf conf)
      Returns list of channel names.
      Parameters:
      conf - job conf
      Returns:
      List of channel Names
    • isMultiNamedOutput

      public static boolean isMultiNamedOutput(JobConf conf, String namedOutput)
      Returns if a named output is multiple.
      Parameters:
      conf - job conf
      namedOutput - named output
      Returns:
      true if the name output is multi, false if it is single. If the name output is not defined it returns false
    • getNamedOutputFormatClass

      public static Class<? extends OutputFormat> getNamedOutputFormatClass(JobConf conf, String namedOutput)
      Returns the named output OutputFormat.
      Parameters:
      conf - job conf
      namedOutput - named output
      Returns:
      namedOutput OutputFormat
    • addNamedOutput

      public static void addNamedOutput(JobConf conf, String namedOutput, Class<? extends OutputFormat> outputFormatClass, Schema schema)
      Adds a named output for the job.

      Parameters:
      conf - job conf to add the named output
      namedOutput - named output name, it has to be a word, letters and numbers only, cannot be the word 'part' as that is reserved for the default output.
      outputFormatClass - OutputFormat class.
      schema - Schema to used for this namedOutput
    • addMultiNamedOutput

      public static void addMultiNamedOutput(JobConf conf, String namedOutput, Class<? extends OutputFormat> outputFormatClass, Schema schema)
      Adds a multi named output for the job.

      Parameters:
      conf - job conf to add the named output
      namedOutput - named output name, it has to be a word, letters and numbers only, cannot be the word 'part' as that is reserved for the default output.
      outputFormatClass - OutputFormat class.
      schema - Schema to used for this namedOutput
    • setCountersEnabled

      public static void setCountersEnabled(JobConf conf, boolean enabled)
      Enables or disables counters for the named outputs.

      By default these counters are disabled.

      MultipleOutputs supports counters, by default the are disabled. The counters group is the AvroMultipleOutputs class name.

      The names of the counters are the same as the named outputs. For multi named outputs the name of the counter is the concatenation of the named output, and underscore '_' and the multiname.
      Parameters:
      conf - job conf to enableadd the named output.
      enabled - indicates if the counters will be enabled or not.
    • getCountersEnabled

      public static boolean getCountersEnabled(JobConf conf)
      Returns if the counters for the named outputs are enabled or not.

      By default these counters are disabled.

      MultipleOutputs supports counters, by default the are disabled. The counters group is the AvroMultipleOutputs class name.

      The names of the counters are the same as the named outputs. For multi named outputs the name of the counter is the concatenation of the named output, and underscore '_' and the multiname.
      Parameters:
      conf - job conf to enableadd the named output.
      Returns:
      TRUE if the counters are enabled, FALSE if they are disabled.
    • getNamedOutputs

      public Iterator<String> getNamedOutputs()
      Returns iterator with the defined name outputs.
      Returns:
      iterator with the defined named outputs
    • collect

      public void collect(String namedOutput, Reporter reporter, Object datum) throws IOException
      Output Collector for the default schema.

      Parameters:
      namedOutput - the named output name
      reporter - the reporter
      datum - output data
      Throws:
      IOException - thrown if output collector could not be created
    • collect

      public void collect(String namedOutput, Reporter reporter, Schema schema, Object datum) throws IOException
      OutputCollector with custom schema.

      Parameters:
      namedOutput - the named output name (this will the output file name)
      reporter - the reporter
      schema - schema to use for this output
      datum - output data
      Throws:
      IOException - thrown if output collector could not be created
    • collect

      public void collect(String namedOutput, Reporter reporter, Schema schema, Object datum, String baseOutputPath) throws IOException
      OutputCollector with custom schema and file name.

      Parameters:
      namedOutput - the named output name
      reporter - the reporter
      schema - schema to use for this output
      datum - output data
      baseOutputPath - outputfile name to use.
      Throws:
      IOException - thrown if output collector could not be created
    • getCollector

      public AvroCollector getCollector(String namedOutput, Reporter reporter) throws IOException
      Gets the output collector for a named output.

      Parameters:
      namedOutput - the named output name
      reporter - the reporter
      Returns:
      the output collector for the given named output
      Throws:
      IOException - thrown if output collector could not be created
    • getCollector

      public AvroCollector getCollector(String namedOutput, String multiName, Reporter reporter) throws IOException
      Gets the output collector for a named output.

      Parameters:
      namedOutput - the named output name
      multiName - the multiname
      reporter - the reporter
      Returns:
      the output collector for the given named output
      Throws:
      IOException - thrown if output collector could not be created
    • close

      public void close() throws IOException
      Closes all the opened named outputs.

      If overriden subclasses must invoke super.close() at the end of their close()

      Throws:
      IOException - thrown if any of the MultipleOutput files could not be closed properly.