Hadoop/MapReduce/Reducer

From Wikidoop
Jump to: navigation, search

Contents

Overview

The Reducer is responsible for processing one or more Values which share a common Key (or Key Group). The reducer may perform a number of Extraction and Transformation functions on the Key and associated Values ultimately outputting none, one or many Key/Value pairs of the same, or different Key/Value type (to those input).

'Identity' Reducer

With the new Hadoop API, reducers extend the org.apache.hadoop.mapreduce.Reducer. This class defines an 'Identity' reduce function by default - every input Value is output with the corresponding Key.

Examining the run() method, we can see the lifecycle a reducer instance:

  /**
   * Advanced application writers can use the 
   * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
   * control how the reduce task works.
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKey()) {
      reduce(context.getCurrentKey(), context.getValues(), context);
    }
    cleanup(context);
  }
  • setup(Context) - Perform any setup for the reducer. The default implementation is a no-op method.
  • reduce(Key, Iterable<Value>, Context) - Perform a reduce operation on the given Key and associated Iterable of Values. The default implementation calls Context.write(Key, Value) for each Value in the Iterable.
  • cleanup(Context) - Perform any cleanup for the mapper. The default implementation is a no-op method.

Long running Reducer setup & JVM reuse

For long running setup methods, see the note on Mapper - Long running Mapper setup & JVM reuse

A note about object reuse

As discussed on the Object reuse page, Hadoop will reuse the same Key and Value objects - meaning that when you iterate over the Values, the Value object will be the same object reference, only its contents will change with each iteration:

  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    Set<VALUEIN> valueSet = new TreeSet<VALUEIN>();
 
    for(VALUEIN value: values) {
      valueSet.add(value);
    }
 
    // assuming that VALUEIN implements Comparable, valueSet.size() will always be 1
    // this is because the TreeSet will be comparing value to itself when determining where to place the value 
    // in the tree structure
  }

For those interested, look in the org.apache.hadoop.mapreduce.ReduceContext.ValueIterator class, in the next() method - each call to this method eventually delegates to the org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue() method, which populates the key and value instance variables (rather than creating new instances).

Interestingly this also means that the contents of the key progresses as well - meaning that if your Key class's compareTo method doesn't utilize all fields when comparing (or you have registered a custom group comparator), you can still extract the contents of the key in your reducer. To demonstrate this, consider the following code snippet where we have a Key composed of a Text and a LongWritable and a Value of type Text:

public class Key implements WritableComparable<Key> {
  public Text t = new Text();
  public LongWritable timestamp = new LongWritable();
 
  // readFields / write method omitted
 
  public int compareTo(Key other) {
    // this method is used to sort the keys in the shuffle stage
    int c = t.compareTo(other.t);
    if (c != 0) return c;
    else return timestamp.compareTo(other.timestamp);
  }
 
  public int hashCode() {
    // hash code needs to ensure all keys with the same t end up at the same reducer
    // therefore this method cannot incorporate the timestamp
    return t.hashCode();
  }
}


You can then define a group comparator that only considers the t component of Key:

public class KeyGroupComparator extends WritableComparator {
  public KeyGroupComparator() {
    super(Key.class, true);
  }
 
  public int compare(WritableComparable k1, WritableComparable k2) {
    // only use the text when grouping
    if (k1 instanceof Key && k2 instanceof Key) {
      return ((Key) k1).t.compareTo(((Key) k2).t);
    }
    return super.compare(k1, k2);
  }
}

In your Driver code, ensure you register the group comparator using org.apache.hadoop.mapreduce.Job.setGroupingComparatorClass(Class<? extends RawComparator>)

Now to the point - in your reduce method, the Values will be sorted by the Key's Text and LongWritable fields, but grouped by the Text field. A side effect of object reuse also means that you can access the timestamp field for each value, and its underlying value will change with each Value iterated:

protected void reduce(Key key, Iterable<Value> values, Context cxt) {
  for (Value v : values) {
    // the contents of key.timestamp will adjust for each iteration of this loop
  }
}
Personal tools