Map reduce

MapReduce example

  • Problems

    • Need to replace in-memory wordCount with a disk-based hashmap

    • Need to scale reduce:

      • Need to partition the intermediate data (wordCount) from first phase.

      • Shuffle the partitions to the appropriate machines in second phase.

  • Partition: Partition sorted output of map phase according to hash value. Write output to local disk.

    • Why local disk, not GFS (final input/output all inside GFS):

      • GFS can be too slow.

      • Do not require replication. Just recompute if needed.

  • External sorting: Sort each partition with external sorting.

  • Send: Send sorted partitioned data to corresponding reduce machines.

  • Merge sort: Merge sorted partitioned data from different machines by merge sort.

// first phase
// define wordCount as Multiset;
for each document in documentSet 
{
    T = tokenize( document )
    for each token in T
    {
        wordCount[token]++;
    }
}
display( wordCount )

// second phase
// define totalWordCount as Multiset;
for each wordCount received from first phase
{
    multisetAdd( totalWordCount, wordCount )
}

Interface structures

  • In order for mapping, reducing, partitioning, and shuffling to seamlessly work together, we need to agree on a common structure for the data being processed.

PhaseInputOutput

map

<K1,V1>

List(<K2, V2>)

reduce

<K2,list(V2)>

List(<K3, V3>)

  • Examples

    • Split: The input to your application must be structured as a list of (key/value) pairs, list (<k1,v1>). The input format for processing multiple files is usually list (<String filename, String file_content >). The input format for processing one large file, such as a log file, is list (<Integer line_number, String log_event >).

    • Map: The list of (key/value) pairs is broken up and each individual (key/value) pair, <k1, v1> is processed by calling the map function of the mapper. In practice, the key k1 is often ignored by the mapper. The mapper transforms each < k1,v1 > pair into a list of < k2, v2 > pairs. For word counting, the mapper takes < String filename, String file_content ;> and promptly ignores filename. It can output a list of < String word, Integer count >. The counts will be output as a list of < String word, Integer 1> with repeated entries.

    • Reduce: The output of all the mappers are aggregated into one giant list of < k2, v2 > pairs. All pairs sharing the same k2 are grouped together into a new (key/value) pair, < k2, list(v2) > The framework asks teh reducer to process each one of these aggregated (key/value) pairs individually.

MapReduce steps

  1. Input: The system reads the file from GFS

  2. Split: Splits up the data across different machines, such as by hash value (SHA1, MD5)

  3. Map: Each map task works on a split of data. The mapper outputs intermediate data.

  4. Transmission: The system-provided shuffle process reorganizes the data so that all {Key, Value} pairs associated with a given key go to the same machine, to be processed by Reduce.

  5. Reduce: Intermediate data of the same key goes to the same reducer.

  6. Output: Reducer output is stored.

public class WordCount 
{

    public static class Map 
    {
        // Key is the file location
        public void map( String key, String value, OutputCollector<String, Integer> output ) 
        {
            String[] tokens = value.split(" ");

            for( String word : tokens ) 
            {
                // the collector will batch operations writing to disk
                output.collect( word, 1 );
            }
        }
    }

    public static class Reduce 
    {
        public void reduce( String key, Iterator<Integer> values, OutputCollector<String, Integer> output )
        {
            int sum = 0;
            while ( values.hasNext() ) 
            {
                    sum += values.next();
            }
            output.collect( key, sum );
        }
    }
}

References

Last updated