Common Spark Concepts which may be asked in an interview

In this article I will try to put some common Spark concepts in a tabular format(So that its compact). These are good concepts to remember. Also, they may be asked in Interview questions.

Types of join strateries

Join Strategy Description Use Case Example
Shuffle Hash Join Both DataFrames are shuffled based on the join keys, and then a hash join is performed. Useful when both DataFrames are large and have a good distribution of keys. spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
df1.join(df2, "key")
Broadcast Hash Join One of the DataFrames is small enough to fit in memory and is broadcasted to all worker nodes. A hash join is then performed. Efficient when one DataFrame is much smaller than the other. val broadcastDF = broadcast(df2)
df1.join(broadcastDF, "key")
Sort-Merge Join Both DataFrames are sorted on the join keys and then merged. This requires a shuffle if the data is not already sorted. Suitable for large DataFrames when the join keys are sorted or can be sorted efficiently. spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
df1.join(df2, "key")
Cartesian Join (Cross Join) Every row of one DataFrame is paired with every row of the other DataFrame. Generally not recommended due to its computational expense, but can be used for generating combinations of all rows. df1.crossJoin(df2)
Broadcast Nested Loop Join The smaller DataFrame is broadcasted, and a nested loop join is performed. Used when there are no join keys or the join condition is complex and cannot be optimized with hash or sort-merge joins. val broadcastDF = broadcast(df2)
df1.join(broadcastDF)
Shuffle-and-Replicate Nested Loop Join Both DataFrames are shuffled and replicated to perform a nested loop join. Used for complex join conditions that cannot be handled by other join strategies. df1.join(df2, expr("complex_condition"))

Types of Joins

Join Type Description Example
Inner Join Returns rows that have matching values in both DataFrames. df1.join(df2, "key")
Outer Join Returns all rows when there is a match in either DataFrame. Missing values are filled with nulls. df1.join(df2, Seq("key"), "outer")
Left Outer Join Returns all rows from the left DataFrame, and matched rows from the right DataFrame. df1.join(df2, Seq("key"), "left_outer")
Right Outer Join Returns all rows from the right DataFrame, and matched rows from the left DataFrame. df1.join(df2, Seq("key"), "right_outer")
Left Semi Join Returns only the rows from the left DataFrame that have a match in the right DataFrame. df1.join(df2, Seq("key"), "left_semi")
Left Anti Join Returns only the rows from the left DataFrame that do not have a match in the right DataFrame. df1.join(df2, Seq("key"), "left_anti")
Cross Join Returns the Cartesian product of both DataFrames. Every row in the left DataFrame will be combined with every row in the right DataFrame. df1.crossJoin(df2)
Self Join A join in which a DataFrame is joined with itself. This can be an inner, outer, left, or right join. df.join(df, df("key1") === df("key2"))

Common Spark optimization techniques

Technique What it is When to use Example
Caching and Persistence Storing data in memory for quick access When you need to reuse the same data multiple times scala val cachedData = df.cache()
Broadcast Variables Sending a small dataset to all worker nodes When one dataset is much smaller than the other scala val broadcastData = spark.broadcast(smallDF) largeDF.join(broadcastData.value, "key")
Partitioning Dividing data into smaller, manageable chunks When dealing with large datasets to improve parallel processing scala val partitionedData = df.repartition(10, $"key")
Avoiding Shuffles Reducing the movement of data between nodes To improve performance by minimizing network overhead Use mapPartitions instead of groupBy when possible
Coalesce Reducing the number of partitions When the data has become sparse after a transformation scala val coalescedData = df.coalesce(1)
Predicate Pushdown Filtering data as early as possible in the processing To reduce the amount of data read and processed scala val filteredData = df.filter($"column" > 10)
Using the Right Join Strategy Choosing the most efficient way to join two datasets Based on the size and distribution of data Prefer broadcast joins for small datasets
Tuning Spark Configurations Adjusting settings to optimize resource usage To match the workload and cluster resources scala spark.conf.set("spark.executor.memory", "4g")
Using DataFrames/Datasets API Leveraging the high-level APIs for optimizations To benefit from Catalyst optimizer and Tungsten execution engine scala val df = spark.read.csv("data.csv") df.groupBy("column").count()
Vectorized Query Execution Processing multiple rows of data at a time For high-performance operations on large datasets Use built-in SQL functions and DataFrame methods

Different phases of Spark-SQL engine

Phase Description Details Example
Parsing Converting SQL queries into a logical plan. The SQL query is parsed into an abstract syntax tree (AST). Converting SELECT * FROM table WHERE id = 1 into an internal format.
Analysis Resolving references and verifying the logical plan. Resolves column names, table names, and function names; checks for errors. Ensuring that the table table and the column id exist in the database.
Optimization Improving the logical plan for better performance. Transforms the logical plan using various optimization techniques; applies rules via Catalyst optimizer. Reordering filters to reduce the amount of data processed early on.
Physical Planning Converting the logical plan into a physical plan. Converts the optimized logical plan into one or more physical plans; selects the most efficient plan. Deciding whether to use a hash join or a sort-merge join.
Code Generation Generating executable code from the physical plan. Generates Java bytecode to execute the physical plan; this code runs on Spark executors. Creating code to perform join operations, filter data, and compute results.
Execution Running the generated code on the Spark cluster. Distributes generated code across the Spark cluster; executed by Spark executors; results collected and returned. Running join and filter operations on different nodes in the cluster and aggregating results.

Common reasons for analysis exception in Spark

Here are some common reasons why you might encounter an AnalysisException in Spark:

Reason Description Example
Non-Existent Column or Table Column or table specified does not exist. Referring to a non-existent column id.
Ambiguous Column Reference Same column name exists in multiple tables without qualification. Joining two DataFrames with the same column name id.
Invalid SQL Syntax SQL query has syntax errors. Using incorrect SQL syntax like SELCT.
Unsupported Operations Using an operation that Spark SQL does not support. Using an unsupported function.
Schema Mismatch Schema of the DataFrame does not match the expected schema. Inserting data with different column types.
Missing File or Directory Specified file or directory does not exist. Referring to a non-existent CSV file.
Incorrect Data Type Operations expecting a specific data type are given the wrong type. Performing a math operation on a string column.

Flow of how Spark works internally

Component/Step Role/Process Function/Example
Driver Program Entry point for the Spark application - Manages application lifecycle
- Defines RDD transformations and actions
SparkContext Acts as the master of the Spark application - Connects to cluster manager
- Coordinates tasks
Cluster Manager Manages the cluster of machines - Allocates resources to Spark applications
- Examples: YARN, Mesos, standalone
Executors Worker nodes that run tasks and store data - Execute assigned code
- Return results to the driver
- Cache data in memory for quick access
Spark Application Submission Submitting the driver program to the cluster manager - Example: Submitting a job using spark-submit
SparkContext Initialization Driver program initializes SparkContext - Example: val sc = new SparkContext(conf)
Job Scheduling Driver program defines transformations and actions on RDDs/DataFrames - Example: val rdd = sc.textFile("data.txt").map(line => line.split(" "))
DAG (Directed Acyclic Graph) Creation Constructing a DAG of stages for the job - Stages are sets of tasks that can be executed in parallel
- Example: A series of map and filter transformations create a DAG
Task Execution Dividing the DAG into stages, creating tasks, and sending them to executors - Tasks are distributed across executors
- Each executor processes a partition of the data
Data Shuffling Exchanging data between nodes during operations like reduceByKey - Data is grouped by key across nodes
- Example: Shuffling data for aggregations
Result Collection Executors process the tasks and send the results back to the driver program - Example: Final results of collect or count are returned to the driver
Job Completion Driver program completes the execution - Example: Driver terminates after executing sc.stop()

Explain DAG in Spark

Topic Description Details Example
DAG in Spark DAG stands for Directed Acyclic Graph. - Series of steps representing the operations on data.
- Directed: Operations flow in one direction.
- Acyclic: No cycles or loops.
N/A
Why We Need DAG Optimizes Execution, Fault Tolerance, and Parallel Processing. - Optimizes Execution: Spark can optimize operations.
- Fault Tolerance: Recomputes lost data if a node fails.
- Parallel Processing: Divides tasks into stages for parallel execution.
N/A
Without DAG No Optimization, No Fault Tolerance, and Less Parallelism. - No Optimization: Operations would run as written, slower performance.
- No Fault Tolerance: Inefficient data recomputation.
- Less Parallelism: Harder to parallelize tasks.
N/A
Example Example of a Spark job and DAG construction. - Read Data: sc.textFile("file.txt")
- Split Lines into Words: data.flatMap(...)
- Map Words to Key-Value Pairs: words.map(...)
- Reduce by Key: wordCounts.reduceByKey(...)
- Collect Results: wordCounts.collect()
scala val data = sc.textFile("file.txt") val words = data.flatMap(line => line.split(" ")) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.collect()

Explain spark.sql.shuffle.partitions Variable

Topic Description Details Example
spark.sql.shuffle.partitions Configuration setting for shuffle partitions - Default Value: 200 partitions
- Defines the default number of partitions used when shuffling data for wide transformations
scala spark.conf.set("spark.sql.shuffle.partitions", "number_of_partitions")
Purpose Optimize Performance and Control Data Distribution - Optimize Performance: Balances workload across the cluster
- Control Data Distribution: Manages how data is distributed and processed during shuffle operations
N/A
When It’s Used Wide Transformations and SQL Queries - Wide Transformations: reduceByKey, groupByKey, join, etc.
- SQL Queries: Operations involving shuffling data like joins and aggregations
N/A
How to Set It Setting via Configuration and spark-submit - Configuration: spark.conf.set("spark.sql.shuffle.partitions", "number_of_partitions")
- spark-submit: spark-submit --conf spark.sql.shuffle.partitions=number_of_partitions ...
N/A
Example Default and Custom Settings - Default Setting: scala val spark = SparkSession.builder.appName("Example").getOrCreate() println(spark.conf.get("spark.sql.shuffle.partitions")) // Output: 200
- Custom Setting: scala val spark = SparkSession.builder.appName("Example").getOrCreate() spark.conf.set("spark.sql.shuffle.partitions", "50") val df = spark.read.json("data.json") df.groupBy("column").count().show()
scala val spark = SparkSession.builder.appName("Example").getOrCreate() println(spark.conf.get("spark.sql.shuffle.partitions")) // Output: 200
Why Adjust This Setting? Small and Large Datasets - Small Datasets: Reduce the number of partitions to avoid too many small tasks, leading to overhead
- Large Datasets: Increase the number of partitions to distribute data evenly and avoid large partitions that slow down processing
N/A