Skip to content

From MapReduce to Spark

Apache Spark and RDD Concepts

Hadoop 2.0 vs. Spark

  • Hadoop 2.0 Context:
    • Nodes function as resource managers.
    • It maintains the ability to perform MapReduce as it always has.
    • Crucially, it opens the door to doing "other things" beyond just MapReduce.
  • MapReduce Mechanism:
    • The traditional MapReduce flow was the only mechanism available previously. Hadoop 2.0 is the platform that allows different types of programs to run (managing the resources), while Spark is a specific, high-speed program that can run on that platform.
    • Flow: List[(K1,V1)] \(\rightarrow\) map \(\rightarrow\) List[(K2,V2)] \(\rightarrow\) reduce \(\rightarrow\) List[(K3,V3)].
      • The input of reduce is (K2,Iterable[V2])

Introduction to Apache Spark

  • Definition: A fast and expressive cluster computing engine that is compatible with Apache Hadoop.
  • Performance:
    • Up to 100x faster in memory.
    • Up to 10x faster on disk.
  • Usability:
    • Requires 2-5x less code than Hadoop MapReduce.
    • Offers rich APIs in Java, Scala, and Python.
    • Includes an interactive shell.
  • Efficiency Features:
    • Uses general execution graphs.
    • Leverages in-memory storage.
  • History: Developed at UC Berkeley AMPLab (2009), open-sourced in 2010, and became a top-level Apache project in 2014.

Key Concept: Resilient Distributed Datasets (RDDs)

  • Philosophy: Spark programs are written in terms of operations on distributed datasets.
  • Definition: An RDD is a collection of objects spread across a cluster, stored in RAM or on Disk.
    • RDD[T] denotes a collection of values of type T.
  • Key Characteristics:
    • Resilient: Automatically rebuilt on failure.
    • Distributed: Built through parallel transformations.
    • Partitions: RDDs are divided into "partitions." Workers operate on these partitions independently.

An RDD is immutable (you cannot change it once created). You can only build a new RDD by applying a "transformation" to an existing one.

  • Instead of being a static bucket of data, an RDD is defined by the steps taken to create it.
  • Example: You don't just "have" an RDD of uppercase words. You have an RDD that was built by taking a text file and applying a map(toUpperCase) transformation.
  • Spark does not process data one item at a time (like a standard Python for loop).
  • It assigns different partitions to different workers.
  • These workers execute the transformation logic (like map or filter) independently and at the same time.

An RDD stores the original data reference + the transformation lineage, and Spark recomputes the data from the original source for each action (Lazy evaluation!) unless the RDD is cached/persisted. A base RDD does not store the data. It stores a reference to the data source and how to read it.

This "recipe" approach is why RDDs are Resilient (the "R" in RDD). If a worker node crashes and loses a partition of data, Spark doesn't panic. It looks at the steps it stored and says, "Oh, I know how to make that piece of data again. I just need to re-read this part of the file and re-apply these two maps.".

If you explicitly call .cache(), you change this behavior. When you cache an RDD (like messages.cache() in the example), Spark executes the steps once, calculates the result, and then keeps that result in memory. Future requests don't need to look at the "file name" or re-run the "steps" anymore; they just grab the finished data from RAM.

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.

Operations Overview

Spark operations are categorized into two types:

  • Transformations: Operations that create a new RDD from an existing one (e.g., map, filter, groupBy).
  • Actions: Operations that return a result to the driver program or write data to storage (e.g., count, collect, save).

Map-like Operations (Transformations)

Spark provides an enriched instruction set compared to standard MapReduce.

map(f)

  • Logic: f is given one value of type T and returns one value of type U.
  • Execution:
    • Each worker calls f(x) on each item x from RDD_in.
    • Each worker puts the returned value into a partition of RDD_out.
  • Cardinality: One-to-one mapping.
  • Analogy: Replacing every number in a list with its square.

filter(f)

  • Logic: f takes a value of type T and returns a Boolean.
  • Execution: Keeps only elements where f(x) is true.
  • Relationship: One-to-Zero/One.
  • Analogy: Keeping only the even numbers in a list.

flatMap(f)

  • Logic: f is given one value of type T and returns an iterator/iterable collection producing values of type U.
  • Execution:
    • Each worker calls f(x) on items from RDD_in.
    • The worker traverses the iterable returned by f(x) and adds each value to RDD_out.
  • Relationship: One-to-Many.
  • Concept: Similar to map, but "flattens" the result. Instead of getting an RDD of lists, you get an RDD of the individual elements contained in those lists.
  • Analogy: Taking a list of sentences and splitting them into a list of words.

mapPartitions(f)

  • Logic: f is given an iterator that produces values of type T (an entire partition) and returns an iterator producing values of type U.
  • Behavior: f is not called on a single element, but on an iterator representing an entire partition of data. It returns a new iterator for the output.
  • Why use it?: It allows for bulk optimizations. Because f is called once per partition (rather than once per element), you can perform expensive setup operations (like opening a database connection) just once, reuse it for all items in that partition, and then close it.
  • Use Case: useful when you need MapReduce-style setup and cleanup operations (e.g., opening a database connection once per partition rather than once per record).
  • Relationship: Many-to-Many (Partition level).
  • Analogy: Instead of hiring a taxi for every single person (element) to go to the airport, you hire a bus (partition) that takes everyone at once, saving the "setup cost" of hailing a ride multiple times.
  • Pseudocode Structure:

    def myFunction(values):
        setup things (e.g., open connection)
        for x in values:
            something, probably accumulating values
        cleanup that returns accumulated values
    
    # Usage Example
    def process_partition(values):
        # SETUP (Runs once per partition)
        conn = open_database_connection()
    
        results = []
        # LOOP (Runs for every item in this partition)
        for x in values:
            data = conn.query(x)
            results.append(data)
    
        # CLEANUP (Runs once per partition)
        conn.close()
    
        return results
    
    # Spark calls this function 12 times (if you have 12 partitions)
    rdd.mapPartitions(process_partition)
    
Operation Input (to function f) Output (from function f) Main Use Case
map Single Element Single Element Simple transformation
filter Single Element Boolean Removing unwanted data
flatMap Single Element List/Iterator Splitting items or changing cardinality
mapPartitions Iterator (Entire Partition) Iterator Efficiency (setup/cleanup per partition)

Reduce-like Operations

Important distinction from MapReduce:

These operations do not sort. They use in-memory hash tables for the shuffle, as Spark assumes significantly more RAM availability than MapReduce designed for.

groupByKey

  • Function: Acts like the shuffle phase of MapReduce. RDD[(K, V)] -> RDD[(K, Iterable[V])]
  • Behavior: It brings pairs with the same key to a single place.
  • Note: This is just the shuffle; it does not perform the reduce part. You would typically follow this with a map or flatMap to perform the actual reduction logic.

reduceByKey(f)

  • Function: Like MapReduce's shuffle + combine + reduce.
  • Worker Execution (Step-by-step):

    1. Create a Hash Table called HT.
    2. For each (K,V) pair in the input RDD:

      • If k is not in HT: associate k with v in HT.
      • Otherwise: retrieve old value v_old, apply f(v_old, v), and replace the entry in HT.

      Each worker processes its own data and shrinks it down using the Hash Table. This is the "combine" phase to reduce the amount of data that needs to be sent over the network.

    3. Perform a shuffle: each reducer-like worker receives key-value pairs.

      This happens once. The workers exchange the data they just compressed in the previous step. This step moves data across the network. It ensures that the partial result ("Hello", 5) from Worker A and ("Hello", 3) from Worker B are both sent to the same "reducer-like worker".

    4. Repeat step 2 for all received pairs.

      The workers receive the data from the shuffle and run the exact same Hash Table logic (Step 2) one last time to get the final result.

aggregateByKey(zero, insert, merge)

  • Function: A more complex version of reduceByKey allowing for different input and output types.
  • Parameters:
    • Zero: Initial value.
    • Insert (seqOp): (U, V) => U. Adds a value V to an accumulator U.
    • Merge (combOp): (U, U) => U. Merges two accumulators.
  • Worker Execution:
    1. Create a Hash Table HT.
    2. For each (K,V) pair in input:
      • If k is not in HT: associate k with insert(zero, v).
      • Otherwise: retrieve accumulator u, and replace it with insert(u, v).
    3. Perform shuffle.
    4. Use the merge parameter to combine accumulators from different workers.

Differences between reduceByKey and aggregateByKey

1. Data Type Constraints

  • reduceByKey(f):
    • Constraint: The input type and the output type must be the same.
    • Signature: f: (V, V) => V.
    • Use Case: Ideal for simple mathematical operations like summing or max/min where adding two integers results in an integer.
  • aggregateByKey(zero, insert, merge):
    • Flexibility: The input type (V) can be different from the output/accumulator type (U).
    • Signature: It uses a Zero value of type U, an insert function (U, V) => U, and a merge function (U, U) => U.
    • Use Case: Necessary when you need to change the format of the data during aggregation (e.g., taking a list of single numbers and returning a List or Set of those numbers).

2. Logic & Parameters

  • reduceByKey is simpler. It takes one function (f) and applies it everywhere:
    • It uses f to combine values locally on the worker.
    • It uses the same f to merge results after the shuffle.
  • aggregateByKey is "more complicated" and takes three parameters to handle the type conversion:
    1. zero: The initial starting value for the accumulator.
    2. insert: How to add a new raw input item into the accumulator (runs locally).
    3. merge: How to combine two accumulators together (runs after shuffle).

Technical Note: combineByKey

  • combineByKey is technically the only "real" function underlying these operations.
  • aggregateByKey calls combineByKey(lambda v: insert(zero, v), insert, merge).
  • reduceByKey(f) calls combineByKey(identity, f, f).
  • combineByKey allows providing a specific function to create an accumulator from a single value V (instead of using a generic zero value).

1. What is combineByKey?

combineByKey is the most generic function for aggregating data by key. It handles the specific challenge of: "How do I start combining data when I see a key for the very first time?"

It takes three arguments to handle the lifecycle of an aggregation:

  1. Create Combiner (createCombiner): What to do when you see a key you have never seen before in this partition. You need to transform that first raw value V into an accumulator C.
  2. Merge Value (mergeValue): What to do when you see a key that already exists in your current worker's Hash Table. You combine the new raw value V into the existing accumulator C.
  3. Merge Combiners (mergeCombiners): What to do after the shuffle, when you have two accumulators C (one from Worker A, one from Worker B) that need to be merged.

2. How reduceByKey maps to it

reduceByKey(f) is a special case where the input type and accumulator type are the same.

  • Goal: Sum numbers, or find the max.
  • The Translation:
    • Create Combiner: identity. If the first value is 5, the accumulator starts as 5.
    • Merge Value: f. If the accumulator is 5 and the next value is 3, run f(5, 3).
    • Merge Combiners: f. If Worker A has 8 and Worker B has 10, run f(8, 10).

In Code:

combineByKey(lambda v: v, f, f).

3. How aggregateByKey maps to it

aggregateByKey(zero, insert, merge) is a special case where you are provided a fixed zero value to start with.

  • Goal: Create a list of names, or a complex object.
  • The Translation:
    • Create Combiner: Instead of just using the value v, it takes the user-provided zero and inserts v into it immediately: insert(zero, v).
    • Merge Value: insert.
    • Merge Combiners: merge.

In Code:

combineByKey(lambda v: insert(zero, v), insert, merge).

Let’s say you have a print statement in the map function you defined, without cache, the order of print statement may vary every time you run due to parallelism.

Example Application: Log Mining

  • Goal: Load error messages from a massive log file into memory to perform interactive searches for various patterns.
  • Performance Benchmark: A full-text search of Wikipedia (60GB on 20 EC2 machines) took 0.5 seconds in memory versus 20 seconds on disk.

Code Workflow

The process involves a chain of transformations followed by actions.

  • 1. Base RDD Creation:
    • lines = spark.textFile("hdfs://...")
    • Records the HDFS path, figures out how the file would be split into partitions, creates a base RDD object(metadata + instructions)
  • 2. Transformations (Lazy):
    • errors = lines.filter(lambda s: s.startswith("ERROR"))
      • Filters the raw lines to keep only error logs.
    • messages = errors.map(lambda s: s.split("\\\\t")[2])
      • Extracts the relevant column (message content) from the log line.
  • 3. Caching:
    • messages.cache()
      • Crucial Step: This instructs Spark to store the messages RDD in memory after it is computed.
  • 4. Actions (Trigger Execution):
    • messages.filter(lambda s: "mysql" in s).count()
    • messages.filter(lambda s: "php" in s).count()
    • These lines trigger the actual computation and return results (counts) to the driver.

Execution Architecture

  • Driver: The central controller that defines the RDDs and orchestrates execution. It sends tasks to the workers.
  • Workers & Blocks:
    • Initially, workers read data blocks (Block 1, Block 2, Block 3) from the disk.
  • The Role of Caching:
    • Because messages.cache() was called, the workers store the processed data in their local RAM (Cache 1, Cache 2, Cache 3) after the first pass.
    • When the second action (searching for "php") runs, the workers read directly from Cache rather than going back to the slow disk Blocks.

Key Concepts & Footnotes

Lazy Evaluation

  • Definition: Spark does not execute any transformations immediately. It records the "recipe" (lineage) but does nothing until it reaches an Action (like count).
  • Benefit: This allows Spark to optimize the execution plan and ensures it only loads the data that is strictly needed for the final evaluation.

Caching Strategy

  • Rule: "If you branch, then you cache!"
  • Meaning: If you intend to use the same RDD multiple times for different operations (branching the logic), you must cache it. If you do not, Spark's lazy evaluation will force it to re-compute the entire chain from the raw disk data for every single action.

Phenomenon: Over-Partitioning and Scheduling Overhead

>>> data = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(data, 12)
>>> rdd.collect()
[1, 2, 3, 4, 5]
>>> rdd.glom().collect()
[[], [], [1], [], [2], [], [], [3], [], [4], [], [5]]

Spark's primary goal is to spread work evenly across the entire cluster.

Fault Recovery

RDDs provide fault tolerance through lineage. Instead of replicating data across nodes (which is expensive), RDDs track the sequence of operations used to build them. If a partition of data is lost, Spark uses this lineage information to efficiently recompute only the missing data.

Lineage Example Diagram

  • HDFS File -> filter -> Filtered RDD -> map -> Mapped RDD
  • Concept: If the "Mapped RDD" is lost, Spark consults the dependency chain, sees it came from "Filtered RDD", and re-executes the map function to restore it.
msgs = textFile.filter(lambda s: s.startswith("ERROR"))
           .map(lambda s: s.split("\\\\t")[2])

Programming with RDD’s

SparkContext

  • Definition: The main entry point to Spark functionality.
  • Shell Usage: In the interactive shell, it is available as the variable sc.
  • Standalone Usage: In standalone scripts/programs, you must initialize your own SparkContext.

Creating RDDs

There are two main methods to create an RDD:

From a Collection (Parallelizing)

Turns a local Python collection into a distributed dataset.

# Turn a Python collection into an RDD
sc.parallelize([1, 2, 3])

From External Storage

Loads data from Local FS, HDFS, or S3.

# Load text file from local FS, HDFS, or S3
sc.textFile("file.txt")
sc.textFile("directory/*.txt")
sc.textFile("hdfs://namenode:9000/path/file")

Basic Transformations

Transformations create a new RDD from an existing one. They are lazy (not computed immediately).

  • map: Passes each element through a function.
    • nums.map(lambda x: x*x) -> {1, 4, 9}
  • filter: Keeps elements passing a predicate (boolean check).
    • squares.filter(lambda x: x % 2 == 0) -> {4}
  • flatMap: Maps each element to zero or more output elements. Useful for flattening nested structures.
    • nums.flatMap(lambda x: range(x)) -> {0, 0, 1, 0, 1, 2}
    • Range object is from x is 0,...,x-1

Basic Actions

Actions trigger computation and return results to the driver or storage.

nums = sc.parallelize([1, 2, 3])

# Retrieve RDD contents as a local collection
nums.collect() # => [1, 2, 3]

# Return first K elements
nums.take(2) # => [1, 2]

# Count number of elements
nums.count() # => 3

# Merge elements with an associative function
nums.reduce(lambda x, y: x + y) # => 6

# Write elements to a text file
nums.saveAsTextFile("hdfs://file.txt")

Working with Key-Value Pairs

Spark's "distributed reduce" operations require RDDs containing Key-Value pairs.

Syntax Comparison

  • Python: Uses standard tuples (a, b).
    • Access via pair[0] and pair[1].
  • Scala: Uses tuples (a, b).
    • Access via pair._1 and pair._2.
  • Java: Uses Tuple2(a, b).
    • Access via pair._1() and pair._2().

Some Key-Value Operations

Specific transformations for pair RDDs:

pets = sc.parallelize([("cat", 1), ("dog", 1), ("cat", 2)])

# reduceByKey: Groups by key and sums values
pets.reduceByKey(lambda x, y: x + y)
# => {(cat, 3), (dog, 1)}

# groupByKey: Groups values into a list (expensive)
pets.groupByKey()
# => {(cat, [1, 2]), (dog, [1])}

# sortByKey: Sorts by the key
pets.sortByKey()
# => {(cat, 1), (cat, 2), (dog, 1)}

Word Count Example (The "Hello World" of MapReduce)

The standard flow for a Word Count application:

  1. flatMap: Split lines into words.
  2. map: Convert words to (word, 1) pairs.
  3. reduceByKey: Sum counts for the same word.

Diagram Flow

"to be or" -> (to, 1), (be, 1), (or, 1) "not to be" -> (not, 1), (to, 1), (be, 1)The shuffle step (reduceByKey) aggregates these:to: (1, 1) -> 2 be: (1, 1) -> 2

Python Implementation

lines = sc.textFile("hamlet.txt")
counts = lines.flatMap(lambda line: line.split(" ")) \\\\
              .map(lambda word: (word, 1)) \\\\
              .reduceByKey(lambda x, y: x + y) \\\\
              .saveAsTextFile("results")

Scala Implementation

Standard Syntax

val textFile = sc.textFile("hamlet.txt")

textFile.flatMap(line => line.split(" "))
        .map(word => (word, 1))
        .reduceByKey((x, y) => x + y)
        .saveAsTextFile("results")

Alternative Syntax (Shorthand) Scala allows the use of underscores _ as placeholders for anonymous function parameters.

val textFile = sc.textFile("hamlet.txt")

textFile.flatMap(_.split(" "))
        .map((_, 1))
        .reduceByKey(_ + _)
        .saveAsTextFile("results")

Syntax Notes

  • In Scala, underscores mean "this expression is the body of an anonymous function".
  • _ + _ is equivalent to (x, y) => x + y.

Advanced Key-Value Operations

Beyond standard reductions, Spark provides operators for combining multiple RDDs based on their keys.

  • join: Performs an inner join on two RDDs. Result contains only keys present in both RDDs.
    • Example: merging visit logs with page names.
    • Result format: (Key, (Value_Left, Value_Right))
  • cogroup: Groups data from two RDDs sharing the same key.
    • Concept: It is essentially a full outer join that collects values into lists rather than flattening them.
    • Result format: (Key, ([List of Values from RDD1], [List of Values from RDD2]))

Setting Parallelism

  • Manual Configuration: All pair RDD operations (like reduceByKey, join) accept an optional second argument to set the number of tasks.
    • Example: words.reduceByKey(func, 5) forces 5 output partitions.
    • But this is just the minimum number of tasks.
    • If a file is split into 16 blocks on HDFS, and you open it with textFile(PathString, 10), you’ll still get 16 partitions, not 10.
  • Rule of Thumb: Match partitions to available cores (e.g., 8 vCores = 8 partitions). Ideally, have more tasks than cores so CPU isn't idle while waiting for I/O.
  • Default Behaviors:
    • Derived RDDs: usually inherit the partition count from the parent RDD.
    • Joins: Uses the minimum number of partitions of the two input RDDs.
    • Global Default: Can be set via spark.default.parallelism (applies to shuffles only, not parallelize textFile or other base RDDs).

Spark Architecture

System Components Diagram

The architecture consists of three main parts:

  1. Driver Program: The "Brain." Contains the SparkContext and the main application code. It defines the RDDs and orchestrates execution.
  2. Cluster Manager: Allocates resources.
  3. Worker Nodes: The "Muscle." Physical machines that run the actual computations.
    • Executor: A process running on a worker node. It holds the Cache and executes Tasks sent by the driver.

Shared Variables: Broadcasts & Accumulators

Broadcast Variables

  • The Problem: When you use a variable (like thresh = 5) inside a lambda function (e.g., map), Spark serializes that variable and sends a copy to every single task. If the variable is large (like a lookup table), this causes massive network overhead.
    • There are multiple tasks per executor.
    • When you use a variable (like thresh = 5) inside a transformation function (like map), Spark's "closure capture" mechanism automatically serializes that variable and packages it along with the function code to send to the cluster. Crucially, this package is sent separately for every single task, not just once per machine. Since a single Executor (worker node) runs multiple tasks (often concurrently), sending a large variable (like a massive lookup table) inside every task's closure results in huge amounts of redundant network traffic and memory usage. This per-task inefficiency is exactly what Broadcast variables solve by sending the data only once per Executor.
thresh = sc.broadcast(5)
myRdd.filter(lambda x: x > thresh.value)
  • The Solution (Broadcast): Using sc.broadcast(), Spark sends the read-only variable from Driver to each Executor (machine) only once, rather than once per task.
  • Immutability: Broadcast variables are read-only. Attempting to modify them on a worker will throw an error, while modifications to global variables fail silently.

Accumulators (Executor -> Driver)

  • The Problem: Workers cannot simply update a global variable on the Driver (e.g., count += 1) because they are running on different machines.
  • The Solution: Accumulators allow workers to "add" to a variable that only the Driver can read.
    • Usage: Primarily used for counters, debugging metrics, or sums.
  • Code Example (Scala):

    val lineCounter = sc.longAccumulator
    // Inside transformation:
    lineCounter.add(1)
    // On Driver:
    lineCounter.value
    
  • Types of accumulators: longAccumulator, doubleAccumulator. Driver can inspect the value (and take average of values accumulated); Workers can only write.

Under the Hood: The DAG Scheduler

Spark does not execute steps one by one. It builds a Directed Acyclic Graph (DAG) of the entire job before running it.

  • Pipelining: Spark fuses operations together. If you have map -> filter, Spark will process one data element through both functions in a single pass without writing intermediate results to disk.
  • Stages: The DAG Scheduler breaks the job into "Stages" based on where data needs to be shuffled.
    • Diagram Explanation:
      • Stage 1 & 2: Operations like map or filter happen independently on local data (pipelined).
      • Stage 3: The arrows crossing between stages represent a Shuffle. Data must be redistributed across the network (e.g., for a join or groupBy), forcing a stage boundary.

Physical Operators: Dependencies

Dependencies describe how RDD partitions relate to one another.

Narrow Dependency (Fast)

  • Concept: Each partition of the parent RDD is used by at most one partition of the child RDD.
  • Diagram: Straight arrows ====>. Data stays on the same machine.
  • Examples: map, filter, union, join (only if parents are co-partitioned).

Wide Dependency (Slow)

  • Concept: Multiple child partitions depend on multiple parent partitions. This requires a Shuffle (moving data across the network).
  • Diagram: Criss-crossed arrows XXXX. Every node has to talk to every other node.
  • Examples: groupByKey, reduceByKey, join (standard inputs).

Partitioners

By default, Spark uses Hash Partitioning for shuffles (similar to MapReduce). However, you can override this.

  • Hash Partitioner (Default): Distributes keys based on hash code (like MapReduce).
  • Range Partitioner: Divides keys into sorted ranges (e.g., A-M, N-Z). This is used for sorting operations.
  • Custom Partitioner: You can define your own logic (e.g., strict modulo arithmetic) to control exactly which key goes to which machine.
class myPartitioner(override val numPartitions: Int) extends Partitioner {
  def getPartition(key: Any): Int = key match {
    case x: Int => whatever logic you want % numPartitions
    case _ => throw an error
  }

  // Note: You should also implement equals() (see below)
}

// Usage: passing the custom partitioner to a transformation
rdd.reduceByKey(new myPartitioner(5), _ + _)

The Importance of equals()

  • Concept: You must implement the equals method in your custom partitioner class.
  • Why? Spark checks if the new RDD's partitioner is equivalent to the current RDD's partitioner.
  • Optimization: If equals returns true, Spark knows the data is already partitioned according to that rule and can skip an unnecessary shuffle (network transfer). Without it, Spark assumes the partitioning is different and reshuffles the data even if it's already in the right place.

Scala Syntax Note

  • Concept: The slide notes a Scala "oddity" where the language makes little distinction between a zero-argument method and a value field.
  • Application: This allows you to override the abstract method def numPartitions(): Int from the parent class using a simple value field val numPartitions: Int in your implementation.

In Spark, every time you create new myPartitioner(), you are creating a distinct object in memory.

  • Even if the logic is identical, Object_A != Object_B by default in programming.
  • By overriding equals(), you tell Spark: "Even though these are two different objects in memory, they represent the same mathematical rule."

    The "Mailroom" Analogy

    Think of the data as letters and the partitioner as a sorting rule.

    • Step 1 (The Sort): You hire a worker (Partitioner A) to sort all letters into bins by Zip Code.
    • Step 2 (The Join): You hire a new worker (Partitioner B) to match those letters with packages.

    Before Partitioner B starts working, they look at the bins and ask: "Are these letters already sorted by Zip Code?"

    • If equals() returns True: Partitioner B says: "Oh, Partitioner A used the exact same rule as me! The letters are already in the right bins. I don't need to re-sort them." -> Shuffle Skipped (Fast)
    • If equals() returns False (or is missing): Partitioner B says: "I don't know who Partitioner A is or what logic they used. I can't trust these bins." Partitioner B dumps all the bins onto the floor and re-sorts everything from scratch. -> Unnecessary Shuffle (Slow)

RDD Reduction Operations: reduce vs reduceByKey

It is important to distinguish between the standard reduce (from MapReduce) and Spark's specific reduceByKey transformation.

Reduce (MapReduce Style)

  • Signature: (K2, V2) => List[K3, V3]
  • Mechanism: Key-Value pairs are partitioned and shuffled by a Partitioner. The reduce job then processes keys in sorted order.

reduceByKey (Spark)

  • Signature: V => V (Values map to Values)
    • It means that the function you pass to reduceByKey cannot change the data type of the values. The input type and the output type must be identical.
  • Transformation: RDD[(K, V)] => RDD[(K, V)]
  • Flexibility: It is less flexible than a generic reduce but highly optimized for its specific purpose (combining values).
  • Optimization (Combiner): Crucially, it performs a local reduction (combiner) on the mapper side before the shuffle occurs. This significantly reduces network traffic. It then performs the final reduction after the shuffle.
  • You cannot define a custom combiner structure. Input types and Output types must be the same (e.g., Int -> Int). Spark handles the combiner logic automatically for you. Spark automatically applies this function in two places:
    1. Map Side (As the Combiner): It runs locally on each partition to "pre-reduce" the data before sending it over the network.
    2. Reduce Side (As the Reducer): It runs again after the shuffle to compute the final result.

Advanced Aggregations: combineByKey

When reduceByKey is not enough (e.g., when the return type differs from the input value type), combineByKey provides fine-grained control.

Mechanism

It uses three functions to transform an RDD of (K, V) into (K, C):

  1. createCombiner (V => C): Called when a key is encountered for the first time in a partition. It initializes a "Collector" (accumulator) C from the first value V.
  2. mergeValue (C, V => C): Called when the key is already present in the partition. It adds a new value V to the existing Collector C.
  3. mergeCombiners (C, C => C): Called during the shuffle phase to merge Collectors from different partitions.
  4. Note: reduceByKey is actually implemented by calling combineByKey where the input and output types are identical. reduceByKey(reduce) calls combineByKey(identity, reduce, reduce)

Advanced Aggregations: aggregateByKey

aggregateByKey sits between reduceByKey and combineByKey in terms of complexity.

Mechanism

Instead of a createCombiner function, it takes a zero value.

  • Signature: aggregateByKey(zeroValue)(seqOp, combOp)
  • Zero Value: Used to initialize the accumulator for each key.
    • Serialization: The zero value is serialized and deserialized each time it is needed, ensuring it is "fresh" and avoiding mutation bugs.
    • Warning: If you use a mutable object (like a Set) as the zero value, you must ensure your merge logic actually returns the modified set, or you might end up with empty sets.

Currying and Type Inference

In Scala, this function is curried (split into two parameter lists): rdd.aggregateByKey(zero)(append, merge)

  • Reason: This helps the compiler's type inference. It infers the type C from the zero value in the first group, then uses that known type to check the functions in the second group. Python (being dynamic) does not need this.

Deep Dive: combineByKey vs. aggregateByKey

Both functions are used when the result type C is different from the input type V (e.g., calculating an average where Input=Int, Result=(Int, Int)). The main difference is how they start.

1. combineByKey (The "Function" Approach)

  • How it starts: It uses a function to transform the very first value it sees into the accumulator.
  • Logic: "I found the first number 10. I will run createCombiner(10) to turn it into (10, 1)."
  • Signature: Takes 3 functions:
    1. createCombiner: V => C (Initialization)
    2. mergeValue: (C, V) => C (Adding to local accumulator)
    3. mergeCombiners: (C, C) => C (Merging partitions)

2. aggregateByKey (The "Zero Value" Approach)

  • How it starts: It uses a static zero value that you provide upfront.
  • Logic: "I am starting with (0, 0). I found the first number 10. I will add it to my zero value to get (10, 1)."
  • Signature: Takes a zero value and 2 functions:
    1. zeroValue: C (Initialization)
    2. seqOp: (C, V) => C (Adding to local accumulator)
    3. combOp: (C, C) => C (Merging partitions)

Summary: When to use which?

Scenario Use This Why?
Simple "Empty" State aggregateByKey You have a natural "zero" (like 0, EmptyList, or ""). It is cleaner and easier to read because you don't need to write a special initialization function.
Complex Initialization combineByKey You cannot create a generic "empty" value. For example, if the accumulator structure depends on what the first value is (e.g., "start a set with this specific ID"), you need the dynamic control of combineByKey.

The groupByKey Warning

Avoid groupByKey if possible.

  • The Issue: groupByKey shuffles all values for a key across the network to form a list (K, List[V]). If you immediately reduce that list (e.g., to sum it), you have wasted massive bandwidth shipping data that could have been summed locally first.
  • Alternatives:
    • If you want to reduce the list -> Use reduceByKey.
    • If you want to transform the list -> Use aggregateByKey or combineByKey.
  • Note: Only use groupByKey if your downstream operation truly needs access to the entire list of values simultaneously and cannot process them iteratively.

Execution Flow: reduceByKey Visualized

Contrary to the MapReduce name, Spark's reduceByKey is smarter.

Diagram Explanation

  1. Map Phase (Local):
    • Data starts on different partitions (e.g., (100, (4.0, 1))).
    • Combiner: Spark locally sums values for the same key before sending them.
    • Example: On the green partition, (100, 9.0, 2) and (200, 5.0, 1) are computed locally.
  2. Shuffle Phase:
    • Only the combined totals are sent across the network to the reducers.
  3. Reduce Phase:
    • The final sums are computed: (100, (2430, 517)).

Managing Output Files: coalesce vs repartition

When saving data (e.g., saveAsTextFile), the number of output files equals the number of partitions.

The Problem

If you have 1000 partitions but small data, you end up with 1000 tiny files.

The Solutions

  • repartition(n): Reshuffles all data across the network to create n balanced partitions. Expensive (full shuffle).
  • coalesce(n): Reduces the number of partitions without a full shuffle. It merges existing partitions onto the same machine (e.g., P1 + P2 -> New_P1).
    • Pros: Much faster than repartition.
    • Cons: Can result in unbalanced partitions (if one machine gets all the big partitions).

Code Example: Average Calculation (Scala)

This example calculates the average rating per movie ID from a CSV.

sc.textFile("movies.csv")
  .map(_.split(","))
  // Map to (MovieID, (Rating, 1)) for counting
  .map(lst => (lst(1).toInt, (lst(2).toDouble, 1)))
  // Sum ratings and counts per movie
  .reduceByKey { case ((s1, c1), (s2, c2)) => (s1 + s2, c1 + c2) }
  // Divide sum by count to get average
  .mapValues { case (sum, cnt) => sum / cnt }
  // Merge to 1 partition to get a single output file
  .coalesce(1)
  .saveAsTextFile("averages")

Notes from Slide

  • Pattern Matching: The syntax { case ((s1, c1), (s2, c2)) => ... } is powerful and avoids "ugly" tuple access like p1._1.
  • Naming Tip: When live coding, be careful with variable names like cnt or count to avoid typos that result in embarrassing words.

Debugging: toDebugString

You can view the DAG (Directed Acyclic Graph) lineage of an RDD using print(rdd.toDebugString()).

Output Interpretation (Bottom-to-Top)

  1. HadoopRDD / MapPartitionsRDD: Loading the text file and splitting lines (Stage 1).
  2. ShuffledRDD: The boundary where reduceByKey triggered a shuffle (Stage 2).
  3. CoalescedRDD: The result of coalesce(1) merging partitions.

This text output directly maps to the logical steps the scheduler will execute.