Skip to content

Analyzing Graphs

Graph Theory & Data Structures

Fundamentals

  • Definition: A graph \(G = (V, E)\) consists of a set of Vertices (Nodes) and a set of Edges (Links).
  • Types:
    • Directed: An edge \((u, v)\) goes from \(u\) to \(v\) but implies nothing about the return path.
    • Undirected: An edge \((u, v)\) implies \((v, u)\) automatically.
    • Weighted: Edges can carry numerical labels (weights).
  • Terminology:
    • Out-Neighbor: If edge \((u, v)\) exists, \(v\) is an out-neighbor of \(u\), and \(u\) is an in-neighbor of \(v\).
    • In-Degree / Out-Degree: The count of edges entering or leaving a specific vertex.
  • Sparsity: Most real-world graphs (Social Networks, The Web, Roadways) are sparse. The number of edges is typically closer to \(|V|\) than to the maximum possible \(|V|^2\).

Graph Representations

Adjacency Matrix

image.png

An \(N \times N\) matrix where \(M_{ij} = 1\) if an edge exists between \(v_i\) and \(v_j\).

  • Structure: A dense grid representing all possible connections.
  • Pros:
    • Excellent for mathematical operations (Matrix Multiplication).
    • Fast neighborhood checks (\(O(1)\) lookup).
    • Optimized for GPUs (CUDA loves matrices).
  • Cons:
    • Wasted Space: For sparse graphs, the matrix is almost entirely zeros.
    • Slow Iteration: Finding neighbors takes \(O(N)\) because you must scan the whole row, even if there is only 1 neighbor.

Adjacency List

image.png

An array (or map) where row \(i\) contains a list of only the out-neighbors of vertex \(v_i\).

  • Structure: Conceptually similar to a Postings List in search indices.
  • Pros:
    • Compact: Much smaller than a matrix for sparse graphs.
    • Fast Traversal: Finding out-neighbors is efficient because they are directly stored.
  • Cons:
    • Hard to find In-Neighbors: You cannot easily see "who points to me" without searching every other list (or building a second reverse-graph list).

Edge List

image.png

A simple flat list of pairs representing edges (e.g., (1, 2), (1, 4), (2, 1)).

  • Structure: Just a collection of "from-to" tuples.
  • Pros:
    • Extremely simple to create (just append new edges).
    • Ideal for initial data ingestion.
  • Cons:
    • Hard to Query: Finding neighbors requires scanning the entire list. It is generally not used for processing, but rather as an intermediate format.
    • Life Hack: You can convert an Edge List into an Adjacency List using a groupByKey operation.

Graph Algorithms & Scale

Common Graph Problems

Graph theory maps directly to real-world engineering challenges:

  • Shortest Path: Google Maps, Logistics.
  • Minimum Spanning Tree (MST) & Min-Cut: Utility line planning, Disease spread modeling.
  • Max-Flow: Airline scheduling.
  • Graph Coloring: Scheduling final exams (avoiding collisions).
  • Bi-Partite Matching: Dating sites (matching Set A to Set B).

The Web Graph Structure

  • Scale: The web is massive, with ~50 billion pages (vertices) and ~1 trillion links (edges).
  • Power Law: It follows a strict power law distribution (\(P(k) \sim k^{-\gamma}\)). Most pages have very few links (0 or 1), while a tiny fraction have millions. This "Long Tail" makes processing difficult.
  • "Dark Web": In this context, it simply refers to pages not indexed by search engines (e.g., behind logins), not necessarily illegal content.
  • Big Data Definition: For graphs, even 60GB is considered "Big Data" because the complexity of traversing edges makes it computationally heavy, necessitating tools like Hadoop or Spark.

Parallel Graph Processing

  • The Paradigm: Most parallel graph algorithms consist of two steps:
    1. Local Computation: Calculating a value for a node based on its current state (Independent).
    2. Propagation: Sending that value to neighbors (Graph Traversal).
  • Best Representation: Adjacency Lists are the superior format here. They map naturally to MapReduce Key-Value Pairs (Key = Vertex, Value = Neighbor List).
  • Execution: "Propagation" is effectively a Shuffle/Reduce operation, where messages are grouped by destination vertex.

Single-Source Shortest Path (SSSP)

  • The Problem: Find the shortest path from a single source node to all other nodes in the graph.
  • Sequential Solution (Dijkstra): The standard algorithm uses a global priority queue to visit the "unvisited node with the lowest distance" (\(D\)) and relax its edges.

image.png

  • MapReduce Failure: Dijkstra cannot be efficiently implemented on MapReduce.
    • Why? The step "Find node with Minimum \(D\)" requires global knowledge of the entire graph's state. It is not a local computation, breaking the parallelism.
  • The Parallel Solution: Instead of Dijkstra, distributed systems use Parallel Breadth-First Search (pBFS).

The Challenge: Unweighted Graphs & Iteration

  • Goal: Find the shortest path (fewest hops) from a source node to all other nodes.
  • Inductive Logic:
    • \(dist(\text{source}) = 0\)
    • \(dist(v) = 1 + \min(dist(u))\) for all neighbors \(u\).

image.png

Hadoop MapReduce Implementation

Implementing this on MapReduce requires preserving the graph structure across stateless jobs.

Let keys be nodes \(n\), values be (\(d\) which is distance to \(n\), adjacency list of \(n\)),

Key: NodeID | Value: (Current_Distance, List_of_Neighbors)

  • The Mapper:
    • Emits updated distances (\(d+1\)) to all neighbors (\(m\)). → \((m : d+1)\)
    • Crucially: Must also emit the original node structure \((n : d)\) so the adjacency list is not lost for the next iteration.
      • So when we say "Next Iteration," we mean an entirely new MapReduce process that starts up, reads the file created by the previous process, and runs the same logic again.
      • 1 Iteration = 1 MapReduce Job.
  • The Reducer:
    • Receives multiple messages for a node ID: some are potential new distances (integers), and one is the Node object itself.
    • Updates the node's distance to the minimum of all received values.
  • Iteration Cycle: Since MapReduce jobs terminate, we must chain them. Job 1 reads from HDFS and writes to HDFS; Job 2 reads Job 1's output. We repeat this until the graph "settles".

Algorithm Pseudocode

The following Python-like pseudocode describes the Parallel Breadth-First Search (pBFS) logic used to replace Dijkstra's algorithm.

# Mapper: Propagates distances and preserves structure
def map(id, node):
    emit(id, node)              # Pass the node structure to next generation
    for m in node.adjList:
        emit(m, node.d + 1)     # Send "d+1" reachability to neighbors

# Reducer: Finds the shortest path discovered so far
def reduce(id, values):
    d = infinity
    node = None

    for o in values:
        if isNode(o):           # Recover the Node object
            node = o
        else:                   # It's a distance message
            d = min(d, o)

    node.d = min(node.d, d)     # Update with shortest distance found
    emit(id, node)

The implementation of Single-Source Shortest Path (SSSP) on MapReduce requires a specific Node object structure containing both the distance from the source (d) and the list of out-neighbors (adjList) to preserve graph topology between stateless jobs. Because the Reducer receives a mix of data types—both the simple distance values sent by neighbors and the complex Node objects themselves—you must use ObjectWritable and reflection to dynamically distinguish between them at runtime. Additionally, a critical optimization: adding a last_updated field to the Node object so that Mappers only propagate messages if the node's distance actually changed in the previous iteration, thereby reducing unnecessary network traffic.

Because MapReduce is completely stateless, it has "amnesia" between jobs; it can’t just remember the graph in memory for the next loop like a normal program would. So, for every single iteration of our algorithm, we are forced to write the entire graph—including those massive adjacency lists that aren't even changing—all the way down to the slow HDFS hard drive, only to immediately read them back up again for the next round. It is a massive waste of I/O and time, which is exactly why newer tools like Spark were invented to keep data in memory, but in the strict MapReduce world, this expensive "disk-and-back" cycle is the unavoidable penalty we pay for iteration.

Optimization & Termination

  • Path Reconstruction: The basic algorithm only calculates distance. To reconstruct the actual path, the Node object must store a previous field (the ID of the node that provided the shortest distance).
  • Termination: We know when to stop iterating when a full job completes with zero updates to any node's distance (the graph has converged).
  • Performance Reality:

    • Kevin Bacon Number: Real-world graphs (like social networks) often have small diameters (~6 hops), so the number of iterations is manageable.
    • Frontier Size: The number of active nodes (frontier) grows rapidly then shrinks, varying based on the link distribution.
      • "Active Nodes" (the Frontier) are only the nodes that changed their distance in the previous round.

    image.png

    Here is the reality check on whether this algorithm actually finishes before the heat death of the universe. The good news is that real-world graphs—like social networks or the web—exhibit the "Small World" property, meaning the diameter is surprisingly small (often around 6 hops, like the Kevin Bacon number), so we typically only need to chain about 6 or 7 MapReduce jobs to traverse the whole thing, which is entirely manageable. However, you need to watch out for the "Frontier"—that is the set of nodes active in any given iteration; it doesn't grow linearly, but rather explodes outward in the middle iterations before dying down, meaning your cluster will face a massive spike in workload halfway through the process before idling down at the end.

Code Adaptation for Weighted Edges

To support weighted graphs, the algorithm structure remains largely identical to the unweighted version. The only critical change occurs in the Mapper, where the distance calculation now uses the specific edge weight (m.w) instead of a constant 1.

# Mapper: Now adds specific edge weights
def map(id, node):
    emit(id, node)                # Pass structure forward
    for m in node.adjList:
        # KEY CHANGE: Use m.w (weight) instead of +1
        emit(m.id, node.d + m.w)

# Reducer: Logic remains exactly the same
def reduce(id, values):
    d = infinity
    node = None
    for o in values:
        if isNode(o):
            node = o
        else:
            d = min(d, o)         # Still just finding the minimum

    node.d = min(node.d, d)
    emit(id, node)

Termination Complexity (The "Fuzzy Frontier")

  • Re-visiting Nodes: Unlike unweighted BFS where a node is visited once and "done," weighted graphs allow a node to be updated multiple times if a "longer path with lighter weights" is found later. This creates a "Fuzzy Frontier".
  • Iteration Count: The number of iterations is no longer determined by the graph's diameter (hops). Instead, it depends on the length of the longest cycle-free path, which can be significantly larger, potentially requiring many more MapReduce jobs. For unweighted, the # of iterations needed is the length of the longest “shortest path” (usually short).

Parallel BFS vs. Dijkstra

  • Dijkstra's Limitation: Dijkstra efficiently investigates only the "lowest-cost" node using a global priority queue. This requires Global State, which does not exist in MapReduce.
  • The MapReduce Compromise: We are forced to use Parallel BFS, which blindly investigates all active paths in parallel. It is less efficient (investigates more nodes) but is the only way to operate without shared memory.

Note

The above concepts seem a bit confused for me… I asked Gemini to generate a sample program for me:

// --- STEP 1: INITIALIZATION ---
// We start by creating the initial state of the graph.
// This is done BEFORE the loop starts.
var currentGraph = graph.map { case (id, (dist, neighbors)) =>
  // Check: Is this the specific Start Node we are searching from?
  if (id == startNodeID) {
    // Yes: Set distance to 0 (Source)
    // Structure: (NodeID, (Distance=0, [List of Neighbors]))
    (id, (0, neighbors))
  } else {
    // No: Set distance to "Infinity" (Int.MaxValue)
    // We haven't found a path to this node yet.
    (id, (Int.MaxValue, neighbors))
  }
}

var iterationCount = 0
var updatesMade = 0L // Tracks how many nodes changed distance in this round

// --- STEP 2: THE ITERATION LOOP (The "Job Chain") ---
// This loop replaces the manual "Job 1 -> Job 2 -> Job 3" chaining in Hadoop.
do {
  iterationCount += 1

  // ============================================================
  // THE MAPPER PHASE (flatMap)
  // Corresponds to the Mapper in Slide image_20db61.png
  // ============================================================
  val messages = currentGraph.flatMap { case (id, (currentDist, neighbors)) =>

    // ----------------------------------------------------------
    // JOB A: SELF PRESERVATION (Topology)
    // Slide Reference: "emit(id, node)"
    // ----------------------------------------------------------
    // We must emit the node AS-IS. If we don't, the Reducer will receive
    // distance updates but won't know who the neighbors are for the NEXT round.
    // The node would effectively lose its edges and become a dead end.
    val selfPreservation = Seq((id, (currentDist, neighbors)))

    // ----------------------------------------------------------
    // JOB B: PROPAGATION (Shouting to Neighbors)
    // Slide Reference: "for m in adjList: emit(m, node.d + 1)"
    // ----------------------------------------------------------
    // We only send messages if we have actually been visited (dist < Infinity).
    val neighborMessages = if (currentDist < Int.MaxValue) {
      neighbors.map(neighborID => {
        // Create a message for the neighbor "m".
        // Key: neighborID (so it routes to the right place)
        // Value: (New Distance Proposal, Empty List)
        // Note: The list is empty because we don't know the neighbor's friends!
        (neighborID, (currentDist + 1, List.empty[Int]))
      })
    } else {
      // If we haven't been visited yet, we stay silent.
      Seq.empty
    }

    // Combine both sets of messages into one big stream.
    // Spark will now shuffle these messages across the network so that
    // all messages for "Node X" end up on the same machine.
    selfPreservation ++ neighborMessages
  }

  // ============================================================
  // THE REDUCER PHASE (reduceByKey)
  // Corresponds to the Reducer in Slide image_20db61.png
  // ============================================================
  val newGraph = messages.reduceByKey { (messageA, messageB) =>
    // This function runs whenever two messages land on the same Node ID.
    // We need to merge them into one best result.

    // 1. RECOVER STRUCTURE
    // One of these messages is the "Self Preservation" message containing the
    // real neighbor list. The others are just "Distance Proposals" with empty lists.
    // We check which one has data and keep it.
    // Slide Reference: "if isNode(o): node = o"
    val neighbors = if (messageA._2.nonEmpty) messageA._2 else messageB._2

    // 2. MINIMIZE DISTANCE
    // Compare the distances and keep the smallest one.
    // Example: messageA might say "Distance 5", messageB might say "Distance 3".
    // Slide Reference: "d = min(d, o)"
    val minDist = math.min(messageA._1, messageB._1)

    // Return the merged result: Best Distance + The Neighbor List
    (minDist, neighbors)
  }

  // ============================================================
  // THE CHECK (Counting Changes)
  // Corresponds to "Quitting Time" in Slide image_20db85.jpg
  // ============================================================
  // We join the New Graph with the Old Graph to see who actually changed.
  // This triggers the "Action" that forces Spark to actually run the job.
  val changes = newGraph.join(currentGraph).filter {
    case (id, ((newDist, _), (oldDist, _))) =>
      // A change only counts if we found a SHORTER path.
      newDist < oldDist
  }.count()

  // Update our state for the next loop
  updatesMade = changes
  currentGraph = newGraph

  println(s"Iteration $iterationCount: Updated $updatesMade nodes.")

// Continue looping as long as we are still making updates (The Frontier > 0).
} while (updatesMade > 0)