Link Search Menu Expand Document

Creating a SparkSession

   from pyspark.sql import SparkSession
   spark = SparkSession.builder.appName("xxx").getOrCreate()
   df = spark.read.csv("abc.csv", header=True, inferSchema=True)

alt text

Creating a DataFrame

alt text

Showing dataframe, rows describing tables

df.show()

   df.show()
   df.show(n=10, truncate=False)

alt text

display(df)

   display(df.limit(3))

alt text

df.describe()

describe is used to generate descriptive statistics of the DataFrame. For numeric data, results include COUNT, MEAN, STD, MIN, and MAX, while for object data it will also include TOP, UNIQUE, and FREQ.

   df.describe()
   df.describe().show()

alt text

   df.printSchema()
   df.columns

DESCRIBE FORMATTED tableName

   spark.sql("DESCRIBE FORMATTED tableName")

alt text

SQL

CREATE OR REPLACE VIEW doesn't work in Fabric/AzureSynapse/ADF etc. Instead use this:

If Exists (Select * From sys.sysobjects where name = 'apple')
    DROP TABLE dbo.apple;
GO

alt text

Dropping a table

alt text

Small dataframe

df.limit(100)

  1. Selecting Columns
    df.select("column1", "column2").show()
    
  2. Filtering Data
    df.filter(df["column"] > value).show()
    df.filter(df["column"] == "value").show()
    
  3. Adding Columns
    df.withColumn("new_column", df["existing_column"] * 2).show()
    
  4. Renaming Columns
    df.withColumnRenamed("old_name", "new_name")
    

    alt text

  5. Dropping Columns
    df.drop("column_name")
    
  6. Grouping and Aggregating
    df.groupBy("column").count().show()
    df.groupBy("column").agg({"column2": "avg", "column3": "sum"}).show()
    

    alt text

  7. Sorting Data
    df.orderBy("column").show()
    df.orderBy(df["column"].desc()).show()
    

RDD Operations

  1. Creating an RDD
    rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
    
  2. Transformations
    rdd2 = rdd.map(lambda x: x * 2)
    rdd3 = rdd.filter(lambda x: x % 2 == 0)
    
  3. Actions
    rdd.collect()
    rdd.count()
    rdd.first()
    rdd.take(3)
    

SQL Operations

  1. Creating Temp View
    df.createOrReplaceTempView("table_name")
    
  2. Running SQL Queries
    spark.sql("SELECT * FROM table_name").show()
    

Saving Data

  1. Saving as CSV
    df.write.csv("path/to/save.csv")
    
  2. Saving as Parquet
    df.write.parquet("path/to/save.parquet")
    
  3. Saving to Hive
    df.write.saveAsTable("table_name")
    

    alt text

Miscellaneous

  1. Caching and Unpersisting DataFrames
    df.cache()
    df.unpersist()
    
  2. Explain Plan
    df.explain()
    
  3. Repartitioning Data
    df.repartition(10)
    df.coalesce(5)
    

Pyspark when(condition).otherwise(default)

from pyspark.sql.functions import col, when

result = when(col("Age") > 16, True).otherwise(False)

Remember

The GroupBY columns must match the columns used in the SELECT statement.

DENSE_RANK() function returns the rank of each row within the result set partition, with no gaps in the ranking values. The RANK() function includes gaps in the ranking.