Understanding Broadcast Variables in Spark

Imagine a scenario from the movie “The Matrix” where Morpheus shares a training program with Neo and the other rebels. Instead of loading the training program into the simulation multiple times for each person, Morpheus sends it once to each person’s mind, and they use it as needed.

This is similar to broadcast variables in Spark. These variables are used mainly for performance tuning. Using broadcast variables, you can send small read-only tables to all worker nodes in a cluster. This can reduce shuffling and make operations faster.

When to Use Broadcast Variables

Broadcast variables should be used when you need to send a small table to all nodes. This is ideal for joins where one table is small (like a dimension table). It is not suitable for large tables, as it would cause an out-of-memory exception. If the size of the broadcasted table is larger than the memory, it will surely cause OOM.

Example

Here I will give you two scenario and show you how broadcast variables may imporve joins in a Spark cluster.

Scenario Without Broadcast Variables

Suppose you have a small lookup table (e.g., lookup_dict) and a large dataset (rdd). You want to join the large dataset with the lookup table.

# Small lookup table
lookup_dict = {"A": 1, "B": 2, "C": 3}

# Large RDD
rdd = sc.parallelize(["A", "B", "C", "D", "E", "F", "G", "H"])

# Join operation without broadcast
lookup_rdd = sc.parallelize(list(lookup_dict.items()))
joined_rdd = rdd.map(lambda x: (x,)).join(lookup_rdd)

In this scenario, Spark needs to shuffle data to perform the join operation, which is expensive and time-consuming.

Scenario With Broadcast Variables

Using a broadcast variable, you can send the lookup table to all nodes just once, avoiding the shuffle operation.

# Small lookup table
lookup_dict = {"A": 1, "B": 2, "C": 3}

# Broadcast the lookup table
broadcast_var = sc.broadcast(lookup_dict)

# Large RDD
rdd = sc.parallelize(["A", "B", "C", "D", "E", "F", "G", "H"])

# Use the broadcast variable to look up values
result_rdd = rdd.map(lambda x: (x, broadcast_var.value.get(x, 0)))

# Collect and print the results
print(result_rdd.collect())

How Broadcast Variables Improve Performance

When you join or group DataFrames, Spark usually shuffles data across all worker nodes, which is slow and costly.

  • Avoiding Shuffle:
    • Without Broadcast: Spark shuffles the lookup table data across the network to join with the large dataset.
    • With Broadcast: The lookup table is sent once to all nodes, allowing local lookups without moving data around.
  • Efficiency:
    • Without Broadcast: The join operation involves heavy data movement and sorting, which is slow.
    • With Broadcast: The lookup is done locally on each node using the cached copy, making the operation much faster and reducing network traffic.

Knowledge Check

A. A broadcast variable is a Spark object that needs to be partitioned onto multiple worker nodes because it’s too large to fit on a single worker node.

  • This statement is incorrect. Broadcast variables are designed to be efficiently distributed to each worker node. They are not partitioned but rather fully replicated on each worker node to avoid being sent multiple times.

B. A broadcast variable can only be created by an explicit call to the broadcast() operation.

  • This statement is correct but not complete. While it’s true that broadcast variables are created by calling the broadcast() method, this option doesn’t describe the defining characteristic of broadcast variables, which is how they are used in the Spark architecture.

C. A broadcast variable is entirely cached on the driver node so it doesn’t need to be present on any worker nodes.

  • This statement is incorrect. Broadcast variables are distributed to all worker nodes, not just cached on the driver node. Their purpose is to be available on each worker node to prevent the need to send them multiple times during execution.

D. A broadcast variable is entirely cached on each worker node so it doesn’t need to be shipped or shuffled between nodes with each stage.

  • This statement is correct. The primary characteristic of a broadcast variable is that it is distributed and cached on each worker node. This reduces communication overhead by avoiding repeated transmission of the variable during different stages of the job.

E. A broadcast variable is saved to the disk of each worker node to be easily read into memory when needed.

  • This statement is incorrect. Broadcast variables are stored in memory on each worker node to ensure quick access without the latency involved in reading from disk. The purpose of broadcast variables is to provide fast, in-memory access to frequently used data.

    Summary

Broadcast variables are useful for distributing small datasets (like lookup tables) to all nodes in a Spark cluster. By broadcasting the lookup table, you avoid the need to shuffle data during join operations, which can significantly improve performance by reducing network overhead and computation time. This makes operations faster and more efficient.