# Map reduce

* [MapReduce example](https://github.com/DreamOfTheRedChamber/system-design-interviews/blob/master/bigData/scenario_searchengine-todo.md#mapreduce-example)
  * [Interface structures](https://github.com/DreamOfTheRedChamber/system-design-interviews/blob/master/bigData/scenario_searchengine-todo.md#interface-structures)
  * [MapReduce steps](https://github.com/DreamOfTheRedChamber/system-design-interviews/blob/master/bigData/scenario_searchengine-todo.md#mapreduce-steps)
* [References](https://github.com/DreamOfTheRedChamber/system-design-interviews/blob/master/bigData/scenario_searchengine-todo.md#references)

### MapReduce example

* Problems
  * Need to replace in-memory wordCount with a disk-based hashmap
  * Need to scale reduce:
    * Need to partition the intermediate data (wordCount) from first phase.
    * Shuffle the partitions to the appropriate machines in second phase.
* Partition: Partition sorted output of map phase according to hash value. Write output to local disk.
  * Why local disk, not GFS (final input/output all inside GFS):
    * GFS can be too slow.
    * Do not require replication. Just recompute if needed.
* External sorting: Sort each partition with external sorting.
* Send: Send sorted partitioned data to corresponding reduce machines.
* Merge sort: Merge sorted partitioned data from different machines by merge sort.

```java
// first phase
// define wordCount as Multiset;
for each document in documentSet 
{
    T = tokenize( document )
    for each token in T
    {
        wordCount[token]++;
    }
}
display( wordCount )

// second phase
// define totalWordCount as Multiset;
for each wordCount received from first phase
{
    multisetAdd( totalWordCount, wordCount )
}
```

#### Interface structures

* In order for mapping, reducing, partitioning, and shuffling to seamlessly work together, we need to agree on a common structure for the data being processed.

| Phase  | Input          | Output          |
| ------ | -------------- | --------------- |
| map    | \<K1,V1>       | List(\<K2, V2>) |
| reduce | \<K2,list(V2)> | List(\<K3, V3>) |

* Examples
  * Split: The input to your application must be structured as a list of (key/value) pairs, list (\<k1,v1>). The input format for processing multiple files is usually list (\<String filename, String file\_content >). The input format for processing one large file, such as a log file, is list (\<Integer line\_number, String log\_event >).
  * Map: The list of (key/value) pairs is broken up and each individual (key/value) pair, \<k1, v1> is processed by calling the map function of the mapper. In practice, the key k1 is often ignored by the mapper. The mapper transforms each < k1,v1 > pair into a list of < k2, v2 > pairs. For word counting, the mapper takes < String filename, String file\_content ;> and promptly ignores filename. It can output a list of < String word, Integer count >. The counts will be output as a list of < String word, Integer 1> with repeated entries.
  * Reduce: The output of all the mappers are aggregated into one giant list of < k2, v2 > pairs. All pairs sharing the same k2 are grouped together into a new (key/value) pair, < k2, list(v2) > The framework asks teh reducer to process each one of these aggregated (key/value) pairs individually.

#### MapReduce steps

1. Input: The system reads the file from GFS
2. Split: Splits up the data across different machines, such as by hash value (SHA1, MD5)
3. Map: Each map task works on a split of data. The mapper outputs intermediate data.
4. Transmission: The system-provided shuffle process reorganizes the data so that all {Key, Value} pairs associated with a given key go to the same machine, to be processed by Reduce.
5. Reduce: Intermediate data of the same key goes to the same reducer.
6. Output: Reducer output is stored.

```java
public class WordCount 
{

    public static class Map 
    {
        // Key is the file location
        public void map( String key, String value, OutputCollector<String, Integer> output ) 
        {
            String[] tokens = value.split(" ");

            for( String word : tokens ) 
            {
                // the collector will batch operations writing to disk
                output.collect( word, 1 );
            }
        }
    }

    public static class Reduce 
    {
        public void reduce( String key, Iterator<Integer> values, OutputCollector<String, Integer> output )
        {
            int sum = 0;
            while ( values.hasNext() ) 
            {
                    sum += values.next();
            }
            output.collect( key, sum );
        }
    }
}
```

### References

* [Instagram search](https://instagram-engineering.com/search-architecture-eeb34a936d3a)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://eric-zhang-seattle.gitbook.io/mess-around/big-data/mapreduce.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
