Skip to content

Transformation


Perform Data Validation and Quality Checks

Before you organize your data into a specific model or format, it's crucial to ensure the data is correct and of high quality. Validation includes checking for correct data types, missing or null values, adherence to a predefined schema, and other business rule validations. This step is essential to avoid the "garbage in, garbage out" problem.

Data Validation and Quality Check Strategy

Validation Points:

  1. Schema Validation:
  2. Ensure fields like temp_c, lat, lon, etc., are of correct data types (e.g., float, integer).

  3. Range Checks:

  4. Validate values within expected ranges, e.g., latitude between -90 and 90.

  5. Consistency Checks:

  6. Cross-check related data points (e.g., temp_c and temp_f should match in Celsius/Fahrenheit conversions).

  7. Temporal Consistency:

  8. Check for correct time zone conversions and alignment of localtime.

  9. Missing Data:

  10. Look for nulls or missing values where not expected.

  11. Duplication Checks:

  12. Ensure there are no duplicate records.

Script Overview

This script will use a combination of great-expectations, Pandera, and Pydantic for data validation, and is designed to be run in a Spark environment. The objective is to use of these popular validation libraries.

  1. Data Reading: Leverage Spark to read JSON files from ADLS.
  2. Validation:
  3. Use Pydantic for schema validation.
  4. Implement Pandera for range and consistency checks.
  5. Utilize great-expectations for more complex validations like temporal consistency, missing data, and duplication checks.
  6. Data Modeling: Define a data model, perhaps a star schema for data warehousing.
  7. Parquet Conversion: Prepare for converting the validated and modeled data into Parquet (to be detailed later).

Python Script Skeleton

from pyspark.sql import SparkSession
from pydantic import BaseModel, validator
from pandera import DataFrameSchema, Column, Check
import great_expectations as ge
import pandas as pd

# Part 1: Spark Session Initialization
spark = SparkSession.builder.appName("WeatherDataProcessor").getOrCreate()

# Part 2: Read JSON files from ADLS (based on your ADLSSorter script)
# [ ... Existing ADLSSorter code to fetch files ... ]

# Pydantic model for schema validation
class WeatherData(BaseModel):
    lat: float
    lon: float
    temp_c: float
    temp_f: float
    localtime_epoch: int

    @validator('temp_c')
    def temp_celsius_fahrenheit_match(cls, v, values, **kwargs):
        # Validate temp_c and temp_f conversion
        temp_f = values.get('temp_f')
        if temp_f and abs(temp_f - (v * 9/5 + 32)) > 0.1:
            raise ValueError('temp_c and temp_f do not match')
        return v

# Pandera schema for range and consistency checks
weather_schema = DataFrameSchema({
    "lat": Column(float, Check(lambda x: -90 <= x <= 90)),
    "lon": Column(float, Check(lambda x: -180 <= x <= 180)),
    # ... additional range checks ...
})

# Great Expectations for more advanced checks
def validate_with_great_expectations(df):
    # Define expectations (temporal consistency, missing data, etc.)
    # ge_df = ge.from_pandas(df)
    # ge_df.expect_column_values_to_be_between(...)
    # ... define other expectations ...
    pass

# Main processing loop
for file_path in old_files:
    # Read data
    df = spark.read.json(file_path)
    pd_df = df.toPandas()

    # Validate data
    # Pydantic
    try:
        WeatherData(**pd_df.to_dict(orient="list"))
    except ValidationError as e:
        print(f"Validation error: {e}")

    # Pandera
    try:
        weather_schema.validate(pd_df)
    except SchemaError as e:
        print(f"Schema validation error: {e}")

    # Great Expectations
    validate_with_great_expectations(pd_df)

    # Data Modeling and Transformation (to be defined)
    # ...

# Note: Conversion to Parquet will be handled in the next phase.

Create a Data Model

Once you're confident in your data quality, the next step is to structure or model the data. This could mean transforming raw data into a more meaningful and usable format, aligning it with a dimensional model (like a star schema), or preparing it for specific analytical needs. This stage is where you'd typically perform operations like filtering, grouping, aggregating, or joining different data sets.

Benefits of a data model

Storing files as Parquet without a specific schema or structure is okay if you're merely archiving data or doing simple, infrequent analytics. However, for more complex analytics and reporting, creating a proper data model can have several benefits, like:

a) Faster Query Performance: - By organizing data into a structured model (like star or snowflake schema), you can optimize query performance. The data model reduces the amount of data scanned during queries.

b) Understandability and Consistency: - Having a defined schema makes it easier for data scientists, analysts, and other stakeholders to understand the data. It ensures everyone is working with data in a consistent manner.

c) Joining Multiple Data Sources: - If you have (or plan to have) multiple data sources, a structured data model simplifies joining them. For instance, weather data can be enriched with location data, demographic data, etc.

d) Data Integrity: - A structured data model, especially when coupled with a database or data warehouse, can ensure data integrity with primary and foreign key constraints.

e) Improved Data Quality: - Data models can have defined constraints, ensuring that incoming data meets specific quality standards.

Star Schema: - A commonly used schema in data warehousing. It includes a central fact table (e.g., hourly weather measurements) and surrounding dimension tables (e.g., location, date, time). It's simple and often results in fast query performances.

Snowflake Schema: - A normalized version of the star schema. It can save storage but might result in more complex queries.

Consideration

When deciding on whether to structure your Parquet files according to a data model, consider: - The types of queries you'll be running. - The expected volume of data. - The frequency of data access. - Whether you plan to integrate with other data sources in the future.


Conversion to Parquet:

After the data is validated and properly modeled, converting it into an efficient storage format like Parquet is the final step. Parquet is a columnar storage format, offering efficient data compression and encoding schemes. This format is optimal for analytic querying performance and works well with big data technologies. Converting to Parquet after validation and modeling ensures that you're storing high-quality, well-structured data, making your analytics processes more efficient.

Conversion to parquet Strategy:

When considering a robust data storage strategy, especially for data analytics and long-term storage, the structure and features of Parquet come into play. Parquet is a columnar storage file format, which is optimized for use with big data processing tools like Apache Spark, Apache Hive, and many others. Here's what you should consider for the best Parquet structure:

  1. Columnar Storage: Take advantage of Parquet's columnar storage format. This means when querying specific columns, only those columns' data will be read, resulting in improved performance and reduced I/O.

  2. Schema Evolution: One of the significant advantages of Parquet is its ability to handle schema evolution. Make sure your solution can accommodate changes to the schema over time without affecting the existing data.

  3. Compression: Parquet supports various compression techniques like SNAPPY, GZIP, and more. Depending on your analytics use-case, select the compression method that offers a good trade-off between storage cost and query performance.

  4. Partitioning: For your use-case, since you are already organizing by year, month, day, and hour, you should partition the Parquet files this way. This will speed up query times since only the relevant partitions need to be read.

  5. Example: /year=2023/month=10/day=17/hour=13/data.parquet

  6. Row Group Size: Parquet organizes data into smaller row groups, allowing for more efficient column pruning. Adjusting the size of row groups can have a performance impact. The default is typically 128MB, but you might want to adjust based on your typical query patterns.

  7. Metadata: Parquet files store metadata about the data they contain, which helps in understanding the schema and optimizing queries. Ensure this metadata is kept up-to-date.

  8. Consistent Schema: Ensure that the schema for your Parquet files remains consistent, especially if you're ingesting data regularly. Any changes in the incoming JSON schema should be handled gracefully.

  9. Data Lake Integration: Since you're using Azure Data Lake Storage, ensure that the tools you're using for analytics are well-integrated with ADLS and can take full advantage of the features both ADLS and Parquet provide.

  10. Regular Compaction: Over time, as data gets updated or deleted, you might end up with many small Parquet files. This is suboptimal for query performance. Implement a regular compaction process to combine these smaller files into larger, more efficient Parquet files.

  11. Avoiding Too Many Small Files: If your ingestion process creates too many small files, it can degrade performance. Consider batching incoming data to create larger Parquet files.

Given the JSON structure you provided, you might want to flatten it out a bit for more effective columnar storage, unless you're often querying multiple subfields of location or current together. The columnar nature of Parquet means that nesting can sometimes reduce performance benefits, especially if the data is queried column-by-column.


Appendix

Data Processing Libraries Overview

Data Validation Libraries

Library Why Use Cases
Great Expectations Provides a robust suite for JSON data testing, documentation, and validation, ideal for complex structures with clear, definable rules and expectations, ensuring type, range, structure, and content accuracy. Works with both spark and pandas dataframes. Extensive data quality checks, complex validation rules, data documentation.
Pandera Provides a flexible and expressive API for pandas DataFrame validation, allowing for easy statistical checks, type validation, and more. When working with Pandas for data manipulation and needing validation tightly coupled with these operations.
Pydantic Used primarily for data parsing and validation with a strong emphasis on strict type validations via Python type annotations. Best for scenarios where you are dealing with JSON-like data structures, needing strong type checks and simple data validation.

Data Modeling Libraries

Library Why Use Cases
Pandas Extremely popular for data manipulation and analysis in Python, with a very straightforward, user-friendly API. Pandas is a Python library for data manipulation and analysis. It is well-suited for working with small to medium-sized datasets on a single machine.
PySpark Best for large-scale data processing. It can handle very large datasets that don't fit into a single machine's memory. Large datasets, needing distributed computing, or integrating with other components in a big data ecosystem.

Data Format Conversion Libraries

Library Why Use Cases
PyArrow PyArrow provides a bridge between the columnar storage format Parquet and Python data analysis libraries. It's fast and efficient. High-speed data serialization/deserialization, working with Parquet files, large datasets.
Pandas Directly supports reading and writing Parquet files (though it uses PyArrow or fastparquet under the hood). If you're already using Pandas for data manipulation, converting to/from Parquet is very straightforward.

Conclusion

  • Great Expectations was chosen for its comprehensive data validation and documentation capabilities, key for ensuring data quality in analytics and reporting pipelines.
  • PyArrow is recommended for handling large data volumes and efficient Parquet format conversion due to its performance and direct support for the format.
  • Pandas can be a streamlined choice for moderate-sized data, offering both data manipulation and validation (with Pandera for added validation support), and easy Parquet conversion.

Choice of Platform

Stage Objective Azure Product
Data Validation Ensure JSON consistency, completeness, and data types. Azure Databricks (Use PySpark for large-scale data processing and validation)
Conversion to Parquet Convert JSON to Parquet for efficient storage/querying. Azure Databricks (Native support for JSON & Parquet via PySpark)
Organize Data into a Data Model Structure Parquet files for optimal analytics/reporting. Azure Data Factory (For complex transformation logic)
Azure Databricks (PySpark DataFrame API for reshaping data)
Storage Store transformed Parquet files securely and efficiently. Azure Data Lake Storage Gen2 (Optimized for Azure analytics platforms)
Analytics & Querying Run analytics and queries on data. Azure Synapse Analytics (For massive parallel processing)
Azure Databricks (For deeper analytics/ML)
Monitoring and Maintenance Monitor pipeline health and performance. Azure Monitor and Azure Log Analytics (Full-stack monitoring, advanced analytics)
Data Security Secure data at rest and in transit. Azure Data Lake Storage Gen2 (Encryption at rest)
Azure Key Vault (Manage cryptographic keys/secrets)
Automation and Scheduling Automate the pipeline processes. Azure Data Factory (Define and orchestrate data-driven workflows)

© D Das
📧 das.d@hotmail.com | ddasdocs@gmail.com