Common Spark Concepts which may be asked in an interview¶
In this article I will try to put some common Spark concepts in a tabular format(So that its compact). These are good concepts to remember. Also, they may be asked in Interview questions.
Types of join strateries¶
| Join Strategy | Description | Use Case | Example |
|---|---|---|---|
| Shuffle Hash Join | Both DataFrames are shuffled based on the join keys, and then a hash join is performed. | Useful when both DataFrames are large and have a good distribution of keys. | spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")df1.join(df2, "key") |
| Broadcast Hash Join | One of the DataFrames is small enough to fit in memory and is broadcasted to all worker nodes. A hash join is then performed. | Efficient when one DataFrame is much smaller than the other. | val broadcastDF = broadcast(df2)df1.join(broadcastDF, "key") |
| Sort-Merge Join | Both DataFrames are sorted on the join keys and then merged. This requires a shuffle if the data is not already sorted. | Suitable for large DataFrames when the join keys are sorted or can be sorted efficiently. | spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")df1.join(df2, "key") |
| Cartesian Join (Cross Join) | Every row of one DataFrame is paired with every row of the other DataFrame. | Generally not recommended due to its computational expense, but can be used for generating combinations of all rows. | df1.crossJoin(df2) |
| Broadcast Nested Loop Join | The smaller DataFrame is broadcasted, and a nested loop join is performed. | Used when there are no join keys or the join condition is complex and cannot be optimized with hash or sort-merge joins. | val broadcastDF = broadcast(df2)df1.join(broadcastDF) |
| Shuffle-and-Replicate Nested Loop Join | Both DataFrames are shuffled and replicated to perform a nested loop join. | Used for complex join conditions that cannot be handled by other join strategies. | df1.join(df2, expr("complex_condition")) |
Types of Joins¶
| Join Type | Description | Example |
|---|---|---|
| Inner Join | Returns rows that have matching values in both DataFrames. | df1.join(df2, "key") |
| Outer Join | Returns all rows when there is a match in either DataFrame. Missing values are filled with nulls. | df1.join(df2, Seq("key"), "outer") |
| Left Outer Join | Returns all rows from the left DataFrame, and matched rows from the right DataFrame. | df1.join(df2, Seq("key"), "left_outer") |
| Right Outer Join | Returns all rows from the right DataFrame, and matched rows from the left DataFrame. | df1.join(df2, Seq("key"), "right_outer") |
| Left Semi Join | Returns only the rows from the left DataFrame that have a match in the right DataFrame. | df1.join(df2, Seq("key"), "left_semi") |
| Left Anti Join | Returns only the rows from the left DataFrame that do not have a match in the right DataFrame. | df1.join(df2, Seq("key"), "left_anti") |
| Cross Join | Returns the Cartesian product of both DataFrames. Every row in the left DataFrame will be combined with every row in the right DataFrame. | df1.crossJoin(df2) |
| Self Join | A join in which a DataFrame is joined with itself. This can be an inner, outer, left, or right join. | df.join(df, df("key1") === df("key2")) |
Common Spark optimization techniques¶
| Technique | What it is | When to use | Example |
|---|---|---|---|
| Caching and Persistence | Storing data in memory for quick access | When you need to reuse the same data multiple times | scala val cachedData = df.cache() |
| Broadcast Variables | Sending a small dataset to all worker nodes | When one dataset is much smaller than the other | scala val broadcastData = spark.broadcast(smallDF) largeDF.join(broadcastData.value, "key") |
| Partitioning | Dividing data into smaller, manageable chunks | When dealing with large datasets to improve parallel processing | scala val partitionedData = df.repartition(10, $"key") |
| Avoiding Shuffles | Reducing the movement of data between nodes | To improve performance by minimizing network overhead | Use mapPartitions instead of groupBy when possible |
| Coalesce | Reducing the number of partitions | When the data has become sparse after a transformation | scala val coalescedData = df.coalesce(1) |
| Predicate Pushdown | Filtering data as early as possible in the processing | To reduce the amount of data read and processed | scala val filteredData = df.filter($"column" > 10) |
| Using the Right Join Strategy | Choosing the most efficient way to join two datasets | Based on the size and distribution of data | Prefer broadcast joins for small datasets |
| Tuning Spark Configurations | Adjusting settings to optimize resource usage | To match the workload and cluster resources | scala spark.conf.set("spark.executor.memory", "4g") |
| Using DataFrames/Datasets API | Leveraging the high-level APIs for optimizations | To benefit from Catalyst optimizer and Tungsten execution engine | scala val df = spark.read.csv("data.csv") df.groupBy("column").count() |
| Vectorized Query Execution | Processing multiple rows of data at a time | For high-performance operations on large datasets | Use built-in SQL functions and DataFrame methods |
Different phases of Spark-SQL engine¶
| Phase | Description | Details | Example |
|---|---|---|---|
| Parsing | Converting SQL queries into a logical plan. | The SQL query is parsed into an abstract syntax tree (AST). | Converting SELECT * FROM table WHERE id = 1 into an internal format. |
| Analysis | Resolving references and verifying the logical plan. | Resolves column names, table names, and function names; checks for errors. | Ensuring that the table table and the column id exist in the database. |
| Optimization | Improving the logical plan for better performance. | Transforms the logical plan using various optimization techniques; applies rules via Catalyst optimizer. | Reordering filters to reduce the amount of data processed early on. |
| Physical Planning | Converting the logical plan into a physical plan. | Converts the optimized logical plan into one or more physical plans; selects the most efficient plan. | Deciding whether to use a hash join or a sort-merge join. |
| Code Generation | Generating executable code from the physical plan. | Generates Java bytecode to execute the physical plan; this code runs on Spark executors. | Creating code to perform join operations, filter data, and compute results. |
| Execution | Running the generated code on the Spark cluster. | Distributes generated code across the Spark cluster; executed by Spark executors; results collected and returned. | Running join and filter operations on different nodes in the cluster and aggregating results. |
Common reasons for analysis exception in Spark¶
Here are some common reasons why you might encounter an AnalysisException in Spark:
| Reason | Description | Example |
|---|---|---|
| Non-Existent Column or Table | Column or table specified does not exist. | Referring to a non-existent column id. |
| Ambiguous Column Reference | Same column name exists in multiple tables without qualification. | Joining two DataFrames with the same column name id. |
| Invalid SQL Syntax | SQL query has syntax errors. | Using incorrect SQL syntax like SELCT. |
| Unsupported Operations | Using an operation that Spark SQL does not support. | Using an unsupported function. |
| Schema Mismatch | Schema of the DataFrame does not match the expected schema. | Inserting data with different column types. |
| Missing File or Directory | Specified file or directory does not exist. | Referring to a non-existent CSV file. |
| Incorrect Data Type | Operations expecting a specific data type are given the wrong type. | Performing a math operation on a string column. |
Flow of how Spark works internally¶
| Component/Step | Role/Process | Function/Example |
|---|---|---|
| Driver Program | Entry point for the Spark application | - Manages application lifecycle - Defines RDD transformations and actions |
| SparkContext | Acts as the master of the Spark application | - Connects to cluster manager - Coordinates tasks |
| Cluster Manager | Manages the cluster of machines | - Allocates resources to Spark applications - Examples: YARN, Mesos, standalone |
| Executors | Worker nodes that run tasks and store data | - Execute assigned code - Return results to the driver - Cache data in memory for quick access |
| Spark Application Submission | Submitting the driver program to the cluster manager | - Example: Submitting a job using spark-submit |
| SparkContext Initialization | Driver program initializes SparkContext | - Example: val sc = new SparkContext(conf) |
| Job Scheduling | Driver program defines transformations and actions on RDDs/DataFrames | - Example: val rdd = sc.textFile("data.txt").map(line => line.split(" ")) |
| DAG (Directed Acyclic Graph) Creation | Constructing a DAG of stages for the job | - Stages are sets of tasks that can be executed in parallel - Example: A series of map and filter transformations create a DAG |
| Task Execution | Dividing the DAG into stages, creating tasks, and sending them to executors | - Tasks are distributed across executors - Each executor processes a partition of the data |
| Data Shuffling | Exchanging data between nodes during operations like reduceByKey |
- Data is grouped by key across nodes - Example: Shuffling data for aggregations |
| Result Collection | Executors process the tasks and send the results back to the driver program | - Example: Final results of collect or count are returned to the driver |
| Job Completion | Driver program completes the execution | - Example: Driver terminates after executing sc.stop() |
Explain DAG in Spark¶
| Topic | Description | Details | Example |
|---|---|---|---|
| DAG in Spark | DAG stands for Directed Acyclic Graph. | - Series of steps representing the operations on data. - Directed: Operations flow in one direction. - Acyclic: No cycles or loops. |
N/A |
| Why We Need DAG | Optimizes Execution, Fault Tolerance, and Parallel Processing. | - Optimizes Execution: Spark can optimize operations. - Fault Tolerance: Recomputes lost data if a node fails. - Parallel Processing: Divides tasks into stages for parallel execution. |
N/A |
| Without DAG | No Optimization, No Fault Tolerance, and Less Parallelism. | - No Optimization: Operations would run as written, slower performance. - No Fault Tolerance: Inefficient data recomputation. - Less Parallelism: Harder to parallelize tasks. |
N/A |
| Example | Example of a Spark job and DAG construction. | - Read Data: sc.textFile("file.txt")- Split Lines into Words: data.flatMap(...)- Map Words to Key-Value Pairs: words.map(...)- Reduce by Key: wordCounts.reduceByKey(...)- Collect Results: wordCounts.collect() |
scala val data = sc.textFile("file.txt") val words = data.flatMap(line => line.split(" ")) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.collect() |
Explain spark.sql.shuffle.partitions Variable¶
| Topic | Description | Details | Example |
|---|---|---|---|
| spark.sql.shuffle.partitions | Configuration setting for shuffle partitions | - Default Value: 200 partitions - Defines the default number of partitions used when shuffling data for wide transformations |
scala spark.conf.set("spark.sql.shuffle.partitions", "number_of_partitions") |
| Purpose | Optimize Performance and Control Data Distribution | - Optimize Performance: Balances workload across the cluster - Control Data Distribution: Manages how data is distributed and processed during shuffle operations |
N/A |
| When It's Used | Wide Transformations and SQL Queries | - Wide Transformations: reduceByKey, groupByKey, join, etc.- SQL Queries: Operations involving shuffling data like joins and aggregations |
N/A |
| How to Set It | Setting via Configuration and spark-submit | - Configuration: spark.conf.set("spark.sql.shuffle.partitions", "number_of_partitions")- spark-submit: spark-submit --conf spark.sql.shuffle.partitions=number_of_partitions ... |
N/A |
| Example | Default and Custom Settings | - Default Setting: scala val spark = SparkSession.builder.appName("Example").getOrCreate() println(spark.conf.get("spark.sql.shuffle.partitions")) // Output: 200- Custom Setting: scala val spark = SparkSession.builder.appName("Example").getOrCreate() spark.conf.set("spark.sql.shuffle.partitions", "50") val df = spark.read.json("data.json") df.groupBy("column").count().show() |
scala val spark = SparkSession.builder.appName("Example").getOrCreate() println(spark.conf.get("spark.sql.shuffle.partitions")) // Output: 200 |
| Why Adjust This Setting? | Small and Large Datasets | - Small Datasets: Reduce the number of partitions to avoid too many small tasks, leading to overhead - Large Datasets: Increase the number of partitions to distribute data evenly and avoid large partitions that slow down processing |
N/A |