MapReduce¶
Vertical scaling (scale up): Increase the power of a single machine.
- Add more CPU cores
- Add more RAM
- Use faster disks (SSD → NVMe)
- Use a bigger, more powerful server
Horizontal scaling (scale out): Increase the number of machines.
- Add more servers
- Run the same service on multiple nodes
- Split data and work across machines
Motivation from File Splitting¶
Why splitting by line number fails
- Text files do not store line numbers.
- To locate line N, the system must scan from the beginning and count newline characters.
- For very large files, partitioning becomes inherently sequential.
- Later workers end up scanning most or all of the file.
- Result: no real parallelism and no speedup.
Why byte offsets enable parallelism
- Filesystems support random access to byte positions.
- Workers can seek directly to their assigned byte ranges.
- Each worker reads only its own chunk without scanning earlier data.
- This enables true parallel I/O.
Limitations of byte-based splitting
- Byte boundaries may split words or records.
- A worker may see only part of a logical unit.
- Example:
"Shark"→"Sha"+"rk"across workers.
Why ad-hoc fixes are not sufficient
- Partial records can be fixed by tracking fragments at chunk boundaries.
- Such fixes rely on assumptions about tokenization or record structure.
- They are specific to a particular problem (e.g., word count).
- Ad-hoc solutions do not generalize.
How MapReduce solves the problem
- MapReduce provides a general abstraction using Map and Reduce.
- All data is represented as key–value pairs.
- For text input:
- Key: byte offset
- Value: text at that offset
- Map performs classification.
- Reduce performs aggregation.
- The framework handles partitioning, boundary issues, and aggregation in a scalable way.
Core Idea¶
What MapReduce is
- A programming model where the programmer defines only two functions: map and reduce.
- Conceptually:
map(k1, v1) → list[(k2, v2)]reduce(k2, list[v2]) → list[(k3, v3)]
- The framework, not the programmer, handles parallelism, grouping, and execution.
Map and Reduce intuition
- Map = classification
- Takes one input record at a time.
- Extracts structure from raw data.
- Output keys usually differ from input keys.
- Reduce = aggregation
- Takes a key and all values associated with it.
- Combines them into aggregated results (sum, count, etc.).
- Often keeps the same key and produces a single value.
Reduce can emit zero, one, or many key–value pairs and may change keys; when reducers restructure data rather than just aggregate it, the output often becomes input to another MapReduce iteration.
Word count example
- Input records are
(byte_offset, line_of_text). - Map extracts words and emits
(word, count)or(word, 1). - Reduce receives
(word, [counts…])and sums them. - Output is
(word, total_count)for every unique word. - Counting one word feels like overkill, but the same logic scales naturally to all words.
Why keys are required
- Even if the final answer is “just a number,” MapReduce requires keys.
- Keys define how intermediate data is grouped across machines.
- Using words as keys makes distributed aggregation possible.

Memory and scale considerations
- Individual counters are small, but memory grows with the number of unique keys.
- Assumptions like “lines are small” or “few unique words” break down at scale.
- A single “line” may represent very large content (e.g., an entire book).
Alternative mapping strategy
- Instead of building a dictionary per record, Map can emit
(word, 1)immediately. - Avoids per-record hash tables.
- Duplicate keys are expected and efficiently handled by the framework.
- Enables system-level optimizations such as combiners.
Emitting immediately avoids unbounded in-memory state (we don’t know how many unique words there are in a line, and we can’t assume anything, even though each mapper only deals with one line, but that one line may contain the whole book) by letting MapReduce buffer, spill to disk, and optionally combine key–value pairs, trading disk and network I/O for guaranteed memory safety.

Emit
emit(key, value)means “this is the output of the function”.- Output is not returned as a list.
- Output is handled asynchronously by the framework while the function continues running.
Problem: too much data to Reduce
- For word count, each word produces
(word, 1). - Input text becomes more key–value pairs than original data.
- If the input file is 10TB, more than 10TB may arrive at reducers.
- This is because
(word, 1)is slightly larger than the raw text.
Distribution with multiple reducers
- There can be multiple reducers.
- Each reducer gets all pairs for a given key.
- No key is split across reducers.
Partition function
- MapReduce uses a third function:
partition. - Signature:
(key, value, reducer_count) → reducer_id. - Partition assigns each key–value pair to exactly one reducer.
- Default behaviour hashes the key and ignores the value.
- Using the key ensures all values for the same key go to the same reducer.
Shuffle
- Sending key–value pairs to reducers based on partitioning is called “shuffling”.
- Although the name suggests randomness, the process is deterministic.
- Determinism is the entire point of partitioning.
The shuffle in MapReduce is the process of transferring the intermediate key-value pairs produced by the mappers to the reducers, ensuring that all data with the same key is grouped and sent to the same reducer for final processing.
- In MapReduce, partitioning occurs first at the end of the Map phase, and the shuffle (data transfer) happens immediately afterward, followed by sorting.

MapReduce framework
The MapReduce framework abstracts away most distributed-systems complexity from the programmer. At a high level, it:
- Assigns workers to map and reduce tasks
- Splits input data across map workers
- Groups intermediate key–value pairs by key and routes them to reducers
- Handles failures such as worker crashes
- Manages data movement and aims to minimize cross–data-center transfers
Execution flow

- Input data is divided and sent to multiple mappers (often colocated with the data).
- Each mapper runs
mapand emits intermediate(key, value)pairs. - Intermediate data is shuffled across the network and grouped by key.
- Reducers process all values for a key and emit final output.
Performance bottleneck: The slowest step is the shuffle, sending intermediate results from mappers to reducers. This dominates runtime because it requires large network transfers, while input-to-mapper data movement is usually minimized through data locality.
Combine optimization
Combine is an optional, local optimization that may run during the map phase.
- It aggregates multiple values for the same key on each mapper.
- This reduces the amount of data sent during the shuffle.
- It may run zero, one, or multiple times.
Conceptually, a combine function aggregates values per key and emits a smaller set of (key, value) pairs.
def combine(key, values):
sum = 0
for v in values:
sum += v
emit(key, sum)
It is not guaranteed to run at all, and the framework may apply it zero, one, or multiple times.
The combine function has the same signature as reduce:
- Input:
(key, list of values) - Output:
(key, value)However, unlike reduce, the input and output types must be the same.
Conceptually, combine should:
- Produce one key–value pair per key
- Merge many values into a single partial result
- Reduce the amount of intermediate data sent over the network
The purpose of combine is purely performance:
- It runs locally (usually on mapper output)
- It reduces shuffle traffic between mappers and reducers
Execution details from the diagram:

- Each mapper emits intermediate
(key, value)pairs - Combine may run on each mapper’s local output
- Partial results are grouped by key
- Grouped data is shuffled to reducers
- Reducers finish aggregation and produce final output
- Reducers themselves may also apply combine when merging intermediate files
Important caveats:
- Combine is not part of the logical correctness guarantee
- The framework can skip it entirely
- Even if legal, combine can be semantically wrong
Programmer-defined functions
The programmer defines four functions:
map:(k1, v1) → List[(k2, v2)]combine(optional):(k2, List[v2]) → List[(k2, v2)]reduce:(k2, List[v2]) → List[(k3, v3)]partition:(k2, N) → reducer_id
Correctness considerations for combine
Combine can be the same as reduce only if the operation is safe for partial aggregation.
- Input and output types must match.
- The operation must be associative and decomposable.
Some operations are unsafe. For example, averages:
Mean(2, 3, 4) = 3Mean(Mean(2, 3), 4) = 3.25This shows why combine cannot reuse reduce logic for averages.
Physical View¶
Physical execution view of MapReduce¶
This diagram shows how MapReduce runs on a real cluster.

- The user submits a job to the cluster.
- A master node schedules map tasks and reduce tasks.
- Number of map tasks depends on input file splits.
- Number of reduce tasks is chosen by the user.
- Map workers read their assigned input splits and apply the map function.
- As mapping proceeds, intermediate key–value pairs are written to the mapper’s local disk, not the distributed file system.
- Near the end of the map phase, reducers begin pulling intermediate files from mappers (remote reads).
- Once a reducer has received all its intermediate data, it runs reduce and writes final output to the network file system.
Key idea:
Intermediate data stays local to mappers until reducers explicitly fetch it. This avoids overwhelming the distributed file system but makes the shuffle phase network-heavy.
Distributed group-by and shuffle details¶
This diagram zooms in on what happens during grouping and shuffle.

Map side behaviour¶
- Map outputs are first buffered in memory (circular buffer).
- When the buffer fills, data is spilled to disk.
- Multiple spills are merged into a single, partitioned intermediate file.
- Data is sorted by key within each partition.
- A combiner may run during these merges if CPU is idle.
Reduce side behaviour¶
- Reducers copy intermediate files from all mappers over the network.
- Incoming data is merged in multiple passes (in memory and on disk).
- A combiner may run again during these merge steps.
- The final merge feeds directly into the reduce function.
Synchronization note¶
- There is a logical barrier between map and reduce phases.
- However, reducers can begin copying intermediate data before all mappers finish, overlapping computation and communication.
- In practice, a combiner is run opportunistically: the framework may execute it when the CPU is idle (for example, while waiting for I/O or other mappers to finish), but it may skip it if resources are busy. Reducers may also run combiners while waiting for mapper outputs to arrive, which is why a combiner must not be relied on for correctness. And a combiner must be safe under repeated or missing application, otherwise results may be incorrect.
Data Centre¶
Servers are physically organized into racks. A network switch connects all servers within a rack and also connects that rack to other racks. Multiple racks together form a data centre (this is a simplified view of real data centres).
Where data is located has a major impact on performance.
Accessing data on the local machine (CPU cache, memory, local SSD or disk) provides the lowest latency and highest bandwidth. Accessing data from another machine in the same rack is slower. Accessing data from a machine in a different rack is slower still, and accessing data from a different data centre is the slowest.
- Bandwidth is the maximum amount of data that can be transferred per unit time.
- Latency = how long it takes for the first byte to arrive

This creates a storage and access hierarchy:
- Lower latency and higher bandwidth come from local storage
- Higher capacity comes from using remote machines, but at the cost of increased latency and reduced bandwidth
Key tradeoff:
- Systems prefer local data for speed
- Systems use remote data to scale capacity, accepting performance penalties
Implication for distributed systems (e.g., MapReduce):
- Computation is moved to the data whenever possible
- Cross-rack and cross-datacenter data movement is expensive and should be minimized
Distributed File System¶
The problem being solved¶
We have a very large file (e.g., 200 TB), but each server only has 100 TB of disk. The file cannot fit on a single machine, so it must be distributed across many servers. This is the core problem that distributed file systems solve.
Sharding means:
- Splitting a large file (or dataset) into smaller pieces called chunks
- Distributing those chunks across multiple servers
Each server stores only a subset of the chunks. No single server has the entire file.
Example:
- File.txt → chunks 1, 2, 3, 4, 5, 6, 7, 8
- Chunk 1 → S1
- Chunk 2 → S3
- Chunk 3 → S7
- ... This distribution of chunks across servers is called sharding.
Purpose of sharding:
- Allows storage of files larger than any single disk
- Enables parallel access and processing
- Scales storage capacity by adding more servers
What goes wrong without fault tolerance¶
If each chunk is stored on exactly one server:
- When that server fails, the chunk is lost
- The file becomes corrupted or incomplete
Because commodity servers fail frequently, this approach is unsafe.
Replication (fault tolerance)¶
To handle failures, each chunk is stored on multiple servers. This is called replication.
Example:
- Chunk 2 stored on S3, S19, and S20
- If S3 fails, the chunk can still be read from S19 or S20
Key idea:
- The number of replicas determines how resilient the system is
- More replicas → higher fault tolerance → more storage cost
Sharding vs replication (important distinction)
- Sharding: splits the file into different pieces across servers
- Replication: stores copies of the same piece on multiple servers
Distributed file systems (e.g., HDFS) use both:
- Sharding to scale storage and throughput
- Replication to tolerate failures
Hadoop Distributed File System¶
Core Concepts¶
- Data Coherency: Follows a "Write-once-read-many" model. Clients can only append to existing files; they cannot modify existing data.
- You write the file initially, you can add to the tail, but you can never rewrite the history. After that, you just read it.
- Block Structure: Files are broken into large blocks (typically 64MB). Each block is replicated across multiple servers (DataNodes) for fault tolerance.
- Intelligent Client: The client asks the master where data lives, but then accesses the data directly from the storage nodes (DataNodes).
Architecture¶

NameNode (Master)¶
The NameNode is the central manager.
- Metadata Storage: Stores all metadata in main memory (RAM) for speed (no demand paging). This includes:
- List of files and their attributes.
- Mapping of
File -> Blocks. - Mapping of
Block -> DataNodes.
- Responsibilities:
- Manages the file system namespace.
- Maintains a Transaction Log of changes (creations, deletions).
- Acts as the Replication Engine: Balances disk usage and detects failed nodes.
-
The "Self-Healing" Mechanism (Reaction to Failure)
The most critical job of the Replication Engine is to handle failures automatically.
- Detection: The NameNode constantly monitors "heartbeats" from DataNodes. If a node goes silent (stops sending heartbeats), the Replication Engine assumes it is dead.
- Reaction: It immediately notices that the blocks on that dead node are now "under-replicated" (e.g., you only have 2 copies instead of the required 3).
- Resolution: It chooses a new, healthy DataNode and instructs it to create a fresh copy of the missing blocks to restore the safety level. - The "Load Balancer" (Optimization)
The engine also ensures the cluster runs efficiently.
- Disk Balancing: It monitors how full each DataNode's hard drive is. If one node is getting too full, the engine won't put new blocks there; it chooses empty nodes instead.
- Traffic Balancing: It tries to spread data out so that no single node is overwhelmed by client requests.
- Instructions: If the NameNode notices a problem (or needs to replicate a block), it sends
instructionsback to the DataNode (e.g., "Copy this block to another node").
-
DataNode (Slave)¶
The DataNode is the worker that stores the actual data.
- Function: Acts as a Block Server. It stores raw data and metadata (like checksums) on its local OS file system (e.g., ext3).
- Communication: The DataNodes constantly send "heartbeats" and status updates to the NameNode to say, "I am alive" and "Here are the blocks I have".
- Heartbeats: Sends a signal to the NameNode every 3 seconds to say "I'm alive".
- Block Reports: Periodically sends a list of all blocks it owns to the NameNode.
- Pipelining: Facilitates data transfer by forwarding data directly to other DataNodes.
- Pipelining speeds up the replication process by having the client upload data to only one DataNode, which then simultaneously writes the data to its disk while forwarding it to the next DataNode in the chain, effectively creating a "bucket brigade" of data transfer.
- For example, we need to store 3 copies of a block. The client uploads the data only once to the first DataNode, let’s say Node A. That node then acts as a relay. As soon as Node A receives a small chunk of data, it immediately forwards it to Node B while it writes it to its own disk. It doesn't wait for the whole block to finish before passing it on.
Fault Tolerance & Replication¶
- Terminology: GFS (Google File System) uses "Master/Chunkserver" while Hadoop uses "NameNode/DataNode". Implementation Diffrences?
- Different consistency model for file appends
- Implementation language
- Performance
- Block Placement Policy (Default 3 Replicas):
- Replica 1: On the local node (where the writer is).
- Replica 2: On a remote rack (to survive rack failure).
- Replica 3: On the same remote rack (different node).
- Clients read from nearest replicas, which means the client chooses the copy that is closest to it in terms of network distance
- Recovery: If the NameNode misses heartbeats, it assumes the DataNode failed and creates new replicas elsewhere.
Hadoop Cluster Architecture¶
Data Locality Strategy¶
- The Problem: In traditional supercomputing (Compute-Intensive), data is moved from a Storage Area Network (SAN) to compute nodes. For Data-Intensive tasks, moving petabytes of data across the network creates a massive bottleneck.

-
The Solution: "Don't move data to workers... move workers to the data!".
- Co-location: The system runs the execution process (Node Manager) on the exact same physical machine as the storage process (DataNode).
- That is, co-locate storage and compute! Start up worker on nodes that hold the data
- This eliminates network transfer for local data blocks.

If a server is responsible for both data storage and processing, Hadoop can do a lot of optimization. For example, when assigning mapreduce tasks to servers, Hadoop considers which servers contain what part of the file locally to minimize copy over network. If all of the data can be process locally where it is stored there will be no need to move the data.
- Resource Manager: Manages the available resources for the whole cluster.
- Node Manager: Manages tasks running on a single specific node.
- The Process: The Resource Manager coordinates with the NameNode (which knows where blocks are). It purposefully assigns map tasks to workers that already have the file block locally, so the worker never has to wait for a file download.
- Co-location: The system runs the execution process (Node Manager) on the exact same physical machine as the storage process (DataNode).
Combiners (Optimization)¶
A Combiner is an optional "mini-reducer" that runs in memory on the Mapper side (or Reducer side) to pre-aggregate data before sending it over the network.
- Role: Reduces the volume of intermediate files and network traffic.
- Rules:
- Must have the same signature, like function definition as the Reducer, but input/output types must be same
- Optional Execution: The framework does not guarantee the combiner will run. It may run 0 times, 1 time, or many times (on merged spills), so the logic must be robust.
- Even if a Reducer has matching input and output types (k2=k3, v2=v3), this compatibility only means it might work as a Combiner, but it does not guarantee logical correctness.
Case Study: Computing the Mean¶
The Naive Approach (v1)¶
def map(key : String, value: Int):
emit(key, value)
def reduce(key: String, values: List[Int]):
sum = 0
count = 0
for value in values:
sum += value
count += 1
emit(key, sum / count)
This version works but is slow because it sends every single number over the network to the reducer.
The Incorrect Optimization (v2) - INVALID¶
def map(key : String, value: Int):
emit(key, value)
def combine(key: String, values: List[Int]):
for value in values:
sum += value
count += 1
emit(key, (sum, count)) // INVALID: Changes data type from Int to Tuple
def reduce(key: String, values: List[(Int, Int)]):
for (v, c) in values:
sum += v
count += c
emit(key, sum / count)
This version attempts to use a Combiner to pre-calculate the sum and count. It is INVALID because the Combiner outputs a Tuple (sum, count), but the Mapper outputs an Int. If the Combiner is skipped (which is allowed), the Reducer receives an Int but expects a Tuple, causing a crash.
The Correct Optimization (v3)¶
def map(key : String, value: Int):
emit(key, (value, 1))
def combine(key: String, values: List[(Int, Int)]):
for (v, c) in values:
sum += v
count += c
emit(key, (sum, count))
def reduce(key: String, values: List[(Int, Int)]):
for (v, c) in values:
sum += v
count += c
emit(key, sum / count)
- Map: Adjusted to emit
(key, (value, 1)). It essentially says "Here is a value of X, and it represents a count of 1". - Combiner/Reduce: Now both receive
(sum, count)tuples.- They sum the values:
total_sum += v - They sum the counts:
total_count += c
- They sum the values:
- Result: This works because the input and output types remain consistent throughout the pipeline.
This version fixes the type mismatch. The Mapper now emits a Tuple (value, 1) immediately. This ensures that both the Combiner and the Reducer always receive the same input type (Int, Int), regardless of whether the Combiner runs or not.
In-Mapper Combining (IMC)¶
- Concept: A "Design Pattern" where the aggregation logic is moved inside the Mapper to reduce network traffic and object creation.
- Mechanism:
- Instead of emitting every single
(key, value)pair immediately, the Mapper maintains a local HashMap (or associative array) in memory. - It accumulates counts across many input records and only emits the final sums when the map task is finished.
- Instead of emitting every single
- Performance: It is significantly faster than standard Combiners.
- Risk: Memory Management. If the number of unique keys is too large to fit in the Mapper's RAM, the job will fail. You must ensure the data scale fits in memory.
- Why use it?
- Memory is faster than the filesystem which is faster than network access
- Preserve state across calls to map
Faster Optimization (v4)¶
class mapper:
def setup(self):
# Initialize state before processing begins
self.sums = Map()
self.counts = Map()
def map(self, key, value):
# Accumulate locally in memory. DO NOT emit yet.
self.sums[key] += value
self.counts[key] += 1
def cleanup(self):
# Emit only once after all inputs are processed
for (key, count) in counts:
emit(key, (self.sums[key], count))
While functional programming typically discourages maintaining "state" (memory) across function calls, IMC allows you to deviate from this rule for performance gains, provided you are careful. The critical constraint is scale: you must be absolutely certain that the number of unique keys can fit into the Mapper's RAM. If the dataset has too many keys (e.g., millions of unique words), the mapper will run out of memory and crash, meaning "probably fine" is not an acceptable safety standard.
Even though the Reducer gets data grouped by key, you might use In-Memory Combining (still IMC, same thing) to buffer that data so you can change the output structure (like combining Key A and Key B) before writing the final file. Remember data across different keys!
Why would you do this? To calculate Global Statistics.
Imagine you want to know: "What percentage of ALL fruit is Apples?"
- You cannot calculate this if you process "Apple" in isolation. You need the Total Count of all fruits first.
How IMC on Reducer solves this:
reduce("Apple"): Don't emit! Just add 10 to a variabletotal_fruitsin RAM. Save ("Apple", 10) in a list in RAM.reduce("Banana"): Don't emit! Add 5 tototal_fruits. Save ("Banana", 5) in a list.cleanup(): Now that you have seen everything, you knowtotal_fruits = 15.- Calculate Apple % (10/15). Emit!
- Calculate Banana % (5/15). Emit!
Can we do this for word frequency?
class mapper:
def setup(self):
counts = HashMap()
def map(self, key: Long, value: String):
for word in tokenize(value):
counts[word] += 1
def map_cleanup():
for (key, count) in counts:
emit(key, count)
- The Math: A vocabulary of 1 million unique words would require approximately 24 MB of RAM (accounting for integer storage, string keys, and 50% overhead). Since this is very small compared to modern server RAM, the approach is deemed "probably" safe.
- The Risk: Making assumptions about data size is dangerous. If the input contains garbage data (random strings/URLs) instead of real words, the HashMap could explode in size and crash the Mapper.
Detour: why Long type for key? Key is the position of that line in the file. Big Data files are often massive (Terabytes or Petabytes). A standard 32-bit Integer can only count up to roughly 2 billion (2 GB). Since HDFS files are much larger than 2 GB, a Long (64-bit) is required to track the byte position of lines near the end of the file.
Term Co-Occurrence: The "Pairs" Approach¶
- Goal: Build a Co-Occurrence Matrix (\(N \times N\)) counting how often word \(u\) and word \(v\) appear together.
- The "Pairs" Strategy:
- The Mapper treats every co-occurring pair of words as a unique Key.
- If the sentence is "A B C", it emits pairs
(A,B),(A,C),(B,C), etc., each with a count of1. - The reducer takes pair of words and list of counts, outputs pair of words and counts. In this case, the reducer function can also serve as the combiner
- Pros: Very easy to implement and understand.
- Cons (The "Traffic" Problem):
- Massive Output: It generates substantially more intermediate data pairs than the input size ("That's a lot of pairs!").
- Ineffective Combiners: Combiners provide little benefit here. Because the key space is massive (\(N \times N\)), the chance of the same pair appearing multiple times in the same local map split is very low (sparse), so there is almost nothing to aggregate locally.
Code: Pairs Algorithm¶
def map(key: Long, value: String):
for u in tokenize(value):
# Emit a "1" for every neighbor v found near u
for each v that co-occurs with u in value:
emit((u, v), 1)
def reduce(key: (String, String), values: List[Int]):
# Standard summation
for value in values:
sum += value
emit(key, sum)
The "Stripes" Design Pattern¶
The Stripes approach is an alternative to the "Pairs" pattern for calculating Co-occurrence Matrices.
- Structure: Instead of emitting many small pairs like
(A, B),(A, C), the Mapper groups all co-occurring words for a specific term into a single Associative Array (Map) (or Dictionary). - Data Flow:
- Input: Text line.
- Map Output:
Key = word,Value = Map{neighbor: count}. - Example: For term
a, output isa -> {b1:c1, b2:c2, ...}.
Code: Stripes Algorithm¶
def map(key: Long, value: String):
for u in tokenize(value):
counts = {} # Initialize a local dictionary/map
for each v that co-occurs with u in value:
counts(v) += 1
# Emit the whole map at once
emit(u, counts)
def reduce(key: Long, values: List[Map[String->Int]]):
final_map = {}
for map_fragment in values:
# "Adding" two maps means merging them:
# Union of keys, summing values for intersecting keys
final_map = merge_and_sum(final_map, map_fragment)
emit(key, final_map)
Analysis & Trade-offs¶
- Pros:
- Less Traffic: Generates significantly fewer Key-Value pairs to shuffle across the network compared to Pairs.
- Logical Grouping: The Reducer receives all information for a specific word
uin one place.
- Cons:
- Object Overhead: Creating many small Map objects (HashMaps) is heavier on the Java/JVM object heap than simple integers ("Map is a heavier object than a single Int").
- CPU Intensive: The Combiner/Reducer has to perform complex Map merging logic (union of keys) rather than simple addition.
- The Critical Risk: Memory Overflow.
- "Will the map fit in memory???": For English text and normal sentence lengths, the stripe usually fits. However, in worst-case scenarios (or different datasets), a single Stripe (one word with millions of distinct neighbors) could grow too large and crash the node.
Do not blindly choose Stripes!
The Problem: Relative Frequencies¶
The Mathematical Goal¶
We want to calculate the conditional probability \(f(B|A)\): "How likely is word B to appear if word A appears?".
- \(N(A, B)\): The number of times A and B co-occur.
- \(N(A, *)\): The Marginal Sum (The total number of times A co-occurs with anything).
Solution 1: Using Stripes¶
Stripes solves this easily because the Reducer for key A receives the entire map {B1: 10, B2: 20...}.
- Iterate the map once to calculate the sum (\(N(A, *)\)).
- Iterate again to divide each entry by the sum.
- Note: This relies on the assumption that all data for
Afits in memory.
Solution 2: Using Pairs (The "Order Inversion" Pattern)¶
In the Pairs approach, the Reducer receives (A, B) counts separately. It sums them to get \(N(A, B)\), but it does not know the total \(N(A, *)\) because it processes one pair at a time.
- The Fix: We must calculate the total \(N(A, *)\) and make it available to the Reducer before it processes the specific pairs.
The "Pairs" Solution: Order Inversion & Partitioning¶
To support Relative Frequencies in Pairs, we emit a special "Star" key * to represent the marginal sum.
The Algorithm Requirements¶
- Emit Extra Data: For every word
a, emit a special pair(a, *)representing the total count. - Partitioner: Must send
(a, *)and(a, b)to the same Reducer. We partition based on the left element (a) only. - Sort Order: We must define a custom sort order so that
(a, *)arrives at the Reducer first, before(a, b),(a, c), etc. - Reducer State: The Reducer holds the count from
(a, *)in a variable and uses it to divide subsequent values.
Code: Mapper and Partitioner¶
def map(key: Long, value: String):
for u in tokenize(value):
for v in cooccurrence(u):
emit((u, v), 1) # Specific count
emit((u, "*"), 1) # Marginal count contribution
# CUSTOM PARTITIONER
def partition(key: Pair, value: Int, N: Int):
# Only look at the left word (u).
# This ensures (dog, cat) and (dog, *) go to the same machine.
return hash(key.left) % N
Code: Improved Mapper (Optimization)¶
Instead of emitting a * for every single neighbor (which floods the network), emit one * record per word per line containing the length of the co-occurrence window.
- If window = 2: You look 2 words to the left and 2 words to the right.
def map(key: Long, value: String):
for u in tokenize(value):
for v in cooccurrence(u):
emit((u, v), 1)
# Optimization: Emit the TOTAL count for 'u' in this line once.
# The value is the SIZE of the neighborhood (len).
emit((u, "*"), len(cooccurrence(u)))
Code: The "Stateful" Reducer¶
class MyPairsReducer(Reducer):
# 1. THE STATE LIVES HERE (Class Level)
# It survives between different calls to the reduce() function.
current_marginal = 0.0
def reduce(self, key: Pair, values: List[Int]):
a = key[0]
b = key[1]
# 2. UPDATE STATE
if b == "*":
# We overwrite the class variable
self.current_marginal = sum(values)
# 3. READ STATE
else:
# We read the variable set by the PREVIOUS reduce call
count = sum(values)
emit(key, count / self.current_marginal)
- "*" Definition: The asterisk symbol is chosen because lexicographically it sorts before standard alphanumeric characters (or we force it to), ensuring it arrives first. But we could always use empty string instead of
* - Marginal Sum: The term comes from accounting, where sums were jotted down in the "margins" of a physical spreadsheet.
- Optimization Note: The "Improved Mapper" (emitting length) only works if the co-occurrence definition implies multiple neighbors. If you only look at the immediate neighbor (1:1), the length is always 1, so the optimization does nothing.
State can exist in the Mapper, in the Reducer, or even in both.
Where you put the state (the HashMap or variables) depends entirely on what problem you are trying to solve.
Here is the breakdown of the three scenarios:
1. State in the Mapper ("In-Mapper Combining")
- Goal: Performance (Optimization). You want to "compress" data to reduce network traffic.
- How it works: instead of emitting
(Dog, 1)1,000 times, you count them in memory and emit(Dog, 1000)once.
2. State in the Reducer ("Order Inversion" / "Global Math")
- Goal: Correctness (Math). You need a "Global Total" or "Marginal Count" to divide by.
- How it works:
- Approach A (Order Inversion): The Reducer receives a special "Total" record first
(Dog, *). It saves this value in a state variable (current_marginal) and uses it to divide the next records(Dog, Cat). - Approach B (Buffering): The Reducer saves every record to a list, sums them up to find the total, and then emits the percentages in
cleanup().
- Approach A (Order Inversion): The Reducer receives a special "Total" record first