Common Spark Concepts which may be asked in an interview
In this article I will try to put some common Spark concepts in a tabular format(So that its compact). These are good concepts to remember. Also, they may be asked in Interview questions.
Types of join strateries
Join Strategy | Description | Use Case | Example |
---|---|---|---|
Shuffle Hash Join | Both DataFrames are shuffled based on the join keys, and then a hash join is performed. | Useful when both DataFrames are large and have a good distribution of keys. | spark.conf.set("spark.sql.join.preferSortMergeJoin", "false") df1.join(df2, "key") |
Broadcast Hash Join | One of the DataFrames is small enough to fit in memory and is broadcasted to all worker nodes. A hash join is then performed. | Efficient when one DataFrame is much smaller than the other. | val broadcastDF = broadcast(df2) df1.join(broadcastDF, "key") |
Sort-Merge Join | Both DataFrames are sorted on the join keys and then merged. This requires a shuffle if the data is not already sorted. | Suitable for large DataFrames when the join keys are sorted or can be sorted efficiently. | spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") df1.join(df2, "key") |
Cartesian Join (Cross Join) | Every row of one DataFrame is paired with every row of the other DataFrame. | Generally not recommended due to its computational expense, but can be used for generating combinations of all rows. | df1.crossJoin(df2) |
Broadcast Nested Loop Join | The smaller DataFrame is broadcasted, and a nested loop join is performed. | Used when there are no join keys or the join condition is complex and cannot be optimized with hash or sort-merge joins. | val broadcastDF = broadcast(df2) df1.join(broadcastDF) |
Shuffle-and-Replicate Nested Loop Join | Both DataFrames are shuffled and replicated to perform a nested loop join. | Used for complex join conditions that cannot be handled by other join strategies. | df1.join(df2, expr("complex_condition")) |
Types of Joins
Join Type | Description | Example |
---|---|---|
Inner Join | Returns rows that have matching values in both DataFrames. | df1.join(df2, "key") |
Outer Join | Returns all rows when there is a match in either DataFrame. Missing values are filled with nulls. | df1.join(df2, Seq("key"), "outer") |
Left Outer Join | Returns all rows from the left DataFrame, and matched rows from the right DataFrame. | df1.join(df2, Seq("key"), "left_outer") |
Right Outer Join | Returns all rows from the right DataFrame, and matched rows from the left DataFrame. | df1.join(df2, Seq("key"), "right_outer") |
Left Semi Join | Returns only the rows from the left DataFrame that have a match in the right DataFrame. | df1.join(df2, Seq("key"), "left_semi") |
Left Anti Join | Returns only the rows from the left DataFrame that do not have a match in the right DataFrame. | df1.join(df2, Seq("key"), "left_anti") |
Cross Join | Returns the Cartesian product of both DataFrames. Every row in the left DataFrame will be combined with every row in the right DataFrame. | df1.crossJoin(df2) |
Self Join | A join in which a DataFrame is joined with itself. This can be an inner, outer, left, or right join. | df.join(df, df("key1") === df("key2")) |
Common Spark optimization techniques
Technique | What it is | When to use | Example |
---|---|---|---|
Caching and Persistence | Storing data in memory for quick access | When you need to reuse the same data multiple times | scala val cachedData = df.cache() |
Broadcast Variables | Sending a small dataset to all worker nodes | When one dataset is much smaller than the other | scala val broadcastData = spark.broadcast(smallDF) largeDF.join(broadcastData.value, "key") |
Partitioning | Dividing data into smaller, manageable chunks | When dealing with large datasets to improve parallel processing | scala val partitionedData = df.repartition(10, $"key") |
Avoiding Shuffles | Reducing the movement of data between nodes | To improve performance by minimizing network overhead | Use mapPartitions instead of groupBy when possible |
Coalesce | Reducing the number of partitions | When the data has become sparse after a transformation | scala val coalescedData = df.coalesce(1) |
Predicate Pushdown | Filtering data as early as possible in the processing | To reduce the amount of data read and processed | scala val filteredData = df.filter($"column" > 10) |
Using the Right Join Strategy | Choosing the most efficient way to join two datasets | Based on the size and distribution of data | Prefer broadcast joins for small datasets |
Tuning Spark Configurations | Adjusting settings to optimize resource usage | To match the workload and cluster resources | scala spark.conf.set("spark.executor.memory", "4g") |
Using DataFrames/Datasets API | Leveraging the high-level APIs for optimizations | To benefit from Catalyst optimizer and Tungsten execution engine | scala val df = spark.read.csv("data.csv") df.groupBy("column").count() |
Vectorized Query Execution | Processing multiple rows of data at a time | For high-performance operations on large datasets | Use built-in SQL functions and DataFrame methods |
Different phases of Spark-SQL engine
Phase | Description | Details | Example |
---|---|---|---|
Parsing | Converting SQL queries into a logical plan. | The SQL query is parsed into an abstract syntax tree (AST). | Converting SELECT * FROM table WHERE id = 1 into an internal format. |
Analysis | Resolving references and verifying the logical plan. | Resolves column names, table names, and function names; checks for errors. | Ensuring that the table table and the column id exist in the database. |
Optimization | Improving the logical plan for better performance. | Transforms the logical plan using various optimization techniques; applies rules via Catalyst optimizer. | Reordering filters to reduce the amount of data processed early on. |
Physical Planning | Converting the logical plan into a physical plan. | Converts the optimized logical plan into one or more physical plans; selects the most efficient plan. | Deciding whether to use a hash join or a sort-merge join. |
Code Generation | Generating executable code from the physical plan. | Generates Java bytecode to execute the physical plan; this code runs on Spark executors. | Creating code to perform join operations, filter data, and compute results. |
Execution | Running the generated code on the Spark cluster. | Distributes generated code across the Spark cluster; executed by Spark executors; results collected and returned. | Running join and filter operations on different nodes in the cluster and aggregating results. |
Common reasons for analysis exception in Spark
Here are some common reasons why you might encounter an AnalysisException in Spark:
Reason | Description | Example |
---|---|---|
Non-Existent Column or Table | Column or table specified does not exist. | Referring to a non-existent column id . |
Ambiguous Column Reference | Same column name exists in multiple tables without qualification. | Joining two DataFrames with the same column name id . |
Invalid SQL Syntax | SQL query has syntax errors. | Using incorrect SQL syntax like SELCT . |
Unsupported Operations | Using an operation that Spark SQL does not support. | Using an unsupported function. |
Schema Mismatch | Schema of the DataFrame does not match the expected schema. | Inserting data with different column types. |
Missing File or Directory | Specified file or directory does not exist. | Referring to a non-existent CSV file. |
Incorrect Data Type | Operations expecting a specific data type are given the wrong type. | Performing a math operation on a string column. |
Flow of how Spark works internally
Component/Step | Role/Process | Function/Example |
---|---|---|
Driver Program | Entry point for the Spark application | - Manages application lifecycle - Defines RDD transformations and actions |
SparkContext | Acts as the master of the Spark application | - Connects to cluster manager - Coordinates tasks |
Cluster Manager | Manages the cluster of machines | - Allocates resources to Spark applications - Examples: YARN, Mesos, standalone |
Executors | Worker nodes that run tasks and store data | - Execute assigned code - Return results to the driver - Cache data in memory for quick access |
Spark Application Submission | Submitting the driver program to the cluster manager | - Example: Submitting a job using spark-submit |
SparkContext Initialization | Driver program initializes SparkContext | - Example: val sc = new SparkContext(conf) |
Job Scheduling | Driver program defines transformations and actions on RDDs/DataFrames | - Example: val rdd = sc.textFile("data.txt").map(line => line.split(" ")) |
DAG (Directed Acyclic Graph) Creation | Constructing a DAG of stages for the job | - Stages are sets of tasks that can be executed in parallel - Example: A series of map and filter transformations create a DAG |
Task Execution | Dividing the DAG into stages, creating tasks, and sending them to executors | - Tasks are distributed across executors - Each executor processes a partition of the data |
Data Shuffling | Exchanging data between nodes during operations like reduceByKey | - Data is grouped by key across nodes - Example: Shuffling data for aggregations |
Result Collection | Executors process the tasks and send the results back to the driver program | - Example: Final results of collect or count are returned to the driver |
Job Completion | Driver program completes the execution | - Example: Driver terminates after executing sc.stop() |
Explain DAG in Spark
Topic | Description | Details | Example |
---|---|---|---|
DAG in Spark | DAG stands for Directed Acyclic Graph. | - Series of steps representing the operations on data. - Directed: Operations flow in one direction. - Acyclic: No cycles or loops. | N/A |
Why We Need DAG | Optimizes Execution, Fault Tolerance, and Parallel Processing. | - Optimizes Execution: Spark can optimize operations. - Fault Tolerance: Recomputes lost data if a node fails. - Parallel Processing: Divides tasks into stages for parallel execution. | N/A |
Without DAG | No Optimization, No Fault Tolerance, and Less Parallelism. | - No Optimization: Operations would run as written, slower performance. - No Fault Tolerance: Inefficient data recomputation. - Less Parallelism: Harder to parallelize tasks. | N/A |
Example | Example of a Spark job and DAG construction. | - Read Data: sc.textFile("file.txt") - Split Lines into Words: data.flatMap(...) - Map Words to Key-Value Pairs: words.map(...) - Reduce by Key: wordCounts.reduceByKey(...) - Collect Results: wordCounts.collect() | scala val data = sc.textFile("file.txt") val words = data.flatMap(line => line.split(" ")) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.collect() |
Explain spark.sql.shuffle.partitions Variable
Topic | Description | Details | Example |
---|---|---|---|
spark.sql.shuffle.partitions | Configuration setting for shuffle partitions | - Default Value: 200 partitions - Defines the default number of partitions used when shuffling data for wide transformations | scala spark.conf.set("spark.sql.shuffle.partitions", "number_of_partitions") |
Purpose | Optimize Performance and Control Data Distribution | - Optimize Performance: Balances workload across the cluster - Control Data Distribution: Manages how data is distributed and processed during shuffle operations | N/A |
When It’s Used | Wide Transformations and SQL Queries | - Wide Transformations: reduceByKey , groupByKey , join , etc.- SQL Queries: Operations involving shuffling data like joins and aggregations | N/A |
How to Set It | Setting via Configuration and spark-submit | - Configuration: spark.conf.set("spark.sql.shuffle.partitions", "number_of_partitions") - spark-submit: spark-submit --conf spark.sql.shuffle.partitions=number_of_partitions ... | N/A |
Example | Default and Custom Settings | - Default Setting: scala val spark = SparkSession.builder.appName("Example").getOrCreate() println(spark.conf.get("spark.sql.shuffle.partitions")) // Output: 200 - Custom Setting: scala val spark = SparkSession.builder.appName("Example").getOrCreate() spark.conf.set("spark.sql.shuffle.partitions", "50") val df = spark.read.json("data.json") df.groupBy("column").count().show() | scala val spark = SparkSession.builder.appName("Example").getOrCreate() println(spark.conf.get("spark.sql.shuffle.partitions")) // Output: 200 |
Why Adjust This Setting? | Small and Large Datasets | - Small Datasets: Reduce the number of partitions to avoid too many small tasks, leading to overhead - Large Datasets: Increase the number of partitions to distribute data evenly and avoid large partitions that slow down processing | N/A |