Practical Pipeline Project

Practical Pipeline Project

Overview

In this lesson, we'll integrate all the technologies learned so far to build a real data pipeline. We'll design an end-to-end pipeline using Airflow for orchestration, Spark for large-scale processing, dbt for transformations, and Great Expectations for quality validation.


1. Project Overview

1.1 Scenario

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    E-Commerce Analytics Pipeline                β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                β”‚
β”‚   Business Requirements:                                        β”‚
β”‚   - Daily sales analysis dashboard                             β”‚
β”‚   - Customer segmentation                                      β”‚
β”‚   - Inventory optimization alerts                              β”‚
β”‚                                                                β”‚
β”‚   Data Sources:                                                 β”‚
β”‚   - PostgreSQL: Orders, customers, products                     β”‚
β”‚   - S3: Clickstream logs (JSON)                                β”‚
β”‚   - Kafka: Real-time inventory events                          β”‚
β”‚                                                                β”‚
β”‚   Outputs:                                                      β”‚
β”‚   - Data Warehouse: Snowflake/BigQuery                         β”‚
β”‚   - BI Dashboard: Looker/Tableau                               β”‚
β”‚   - Alert System: Slack/Email                                  β”‚
β”‚                                                                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1.2 Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Pipeline Architecture                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚   Data Sources                                                  β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”                             β”‚
β”‚   β”‚PostgreSQLβ”‚ S3 Logsβ”‚ β”‚ Kafka  β”‚                              β”‚
β”‚   β””β”€β”€β”€β”€β”¬β”€β”€β”€β”˜ β””β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”¬β”€β”€β”€β”€β”˜                             β”‚
β”‚        β”‚         β”‚          β”‚                                   β”‚
β”‚        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                   β”‚
β”‚                   ↓                                             β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚   β”‚                    Airflow                               β”‚  β”‚
β”‚   β”‚    (Orchestration)                                       β”‚  β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚                   ↓                                             β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚   β”‚                 Data Lake (S3)                           β”‚  β”‚
β”‚   β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                 β”‚  β”‚
β”‚   β”‚   β”‚ Bronze  β”‚β†’β”‚ Silver  β”‚β†’β”‚  Gold   β”‚                  β”‚  β”‚
β”‚   β”‚   β”‚  (Raw)  β”‚  β”‚(Cleaned)β”‚  β”‚(Curated)β”‚                  β”‚  β”‚
β”‚   β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                 β”‚  β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚                   ↓                                             β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚
β”‚   β”‚ Spark (Processing) β”‚ dbt (Transform) β”‚ GE (Quality)      β”‚β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚
β”‚                   ↓                                             β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚   β”‚               Data Warehouse                             β”‚  β”‚
β”‚   β”‚         (Snowflake / BigQuery)                           β”‚  β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚                   ↓                                             β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                      β”‚
β”‚   β”‚ BI Tool  β”‚ β”‚ ML Modelsβ”‚ β”‚ Alerts   β”‚                      β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                      β”‚
β”‚                                                                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

2. Project Structure

2.1 Directory Structure

ecommerce_pipeline/
β”œβ”€β”€ airflow/
β”‚   β”œβ”€β”€ dags/
β”‚   β”‚   β”œβ”€β”€ daily_etl_dag.py
β”‚   β”‚   β”œβ”€β”€ hourly_streaming_dag.py
β”‚   β”‚   └── data_quality_dag.py
β”‚   └── plugins/
β”‚       └── custom_operators.py
β”‚
β”œβ”€β”€ spark/
β”‚   β”œβ”€β”€ jobs/
β”‚   β”‚   β”œβ”€β”€ extract_postgres.py
β”‚   β”‚   β”œβ”€β”€ process_clickstream.py
β”‚   β”‚   └── aggregate_daily.py
β”‚   └── utils/
β”‚       └── spark_utils.py
β”‚
β”œβ”€β”€ dbt/
β”‚   β”œβ”€β”€ dbt_project.yml
β”‚   β”œβ”€β”€ models/
β”‚   β”‚   β”œβ”€β”€ staging/
β”‚   β”‚   β”‚   β”œβ”€β”€ stg_orders.sql
β”‚   β”‚   β”‚   β”œβ”€β”€ stg_customers.sql
β”‚   β”‚   β”‚   └── stg_products.sql
β”‚   β”‚   β”œβ”€β”€ intermediate/
β”‚   β”‚   β”‚   └── int_order_items.sql
β”‚   β”‚   └── marts/
β”‚   β”‚       β”œβ”€β”€ fct_orders.sql
β”‚   β”‚       β”œβ”€β”€ dim_customers.sql
β”‚   β”‚       └── agg_daily_sales.sql
β”‚   └── tests/
β”‚       └── assert_positive_amounts.sql
β”‚
β”œβ”€β”€ great_expectations/
β”‚   β”œβ”€β”€ expectations/
β”‚   β”‚   β”œβ”€β”€ orders_suite.json
β”‚   β”‚   └── customers_suite.json
β”‚   └── checkpoints/
β”‚       └── daily_checkpoint.yml
β”‚
β”œβ”€β”€ docker/
β”‚   β”œβ”€β”€ docker-compose.yml
β”‚   └── Dockerfile.spark
β”‚
β”œβ”€β”€ tests/
β”‚   β”œβ”€β”€ test_spark_jobs.py
β”‚   └── test_dbt_models.py
β”‚
└── requirements.txt

3. Airflow DAG Implementation

3.1 Main ETL DAG

# airflow/dags/daily_etl_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.utils.task_group import TaskGroup

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email': ['data-alerts@company.com'],
    'email_on_failure': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='daily_ecommerce_pipeline',
    default_args=default_args,
    description='Daily e-commerce data pipeline',
    schedule_interval='0 6 * * *',  # Daily at 6 AM
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['production', 'etl', 'daily'],
    max_active_runs=1,
) as dag:

    start = EmptyOperator(task_id='start')

    # ============================================
    # Extract: Extract from data sources
    # ============================================
    with TaskGroup(group_id='extract') as extract_group:

        extract_orders = SparkSubmitOperator(
            task_id='extract_orders',
            application='/opt/spark/jobs/extract_postgres.py',
            conn_id='spark_default',
            application_args=[
                '--table', 'orders',
                '--date', '{{ ds }}',
                '--output', 's3://data-lake/bronze/orders/{{ ds }}/'
            ],
        )

        extract_customers = SparkSubmitOperator(
            task_id='extract_customers',
            application='/opt/spark/jobs/extract_postgres.py',
            conn_id='spark_default',
            application_args=[
                '--table', 'customers',
                '--output', 's3://data-lake/bronze/customers/'
            ],
        )

        extract_products = SparkSubmitOperator(
            task_id='extract_products',
            application='/opt/spark/jobs/extract_postgres.py',
            conn_id='spark_default',
            application_args=[
                '--table', 'products',
                '--output', 's3://data-lake/bronze/products/'
            ],
        )

        extract_clickstream = SparkSubmitOperator(
            task_id='extract_clickstream',
            application='/opt/spark/jobs/process_clickstream.py',
            conn_id='spark_default',
            application_args=[
                '--date', '{{ ds }}',
                '--input', 's3://raw-logs/clickstream/{{ ds }}/',
                '--output', 's3://data-lake/bronze/clickstream/{{ ds }}/'
            ],
        )

    # ============================================
    # Quality Check: Bronze layer quality validation
    # ============================================
    with TaskGroup(group_id='quality_bronze') as quality_bronze:

        def run_great_expectations(checkpoint_name: str, **kwargs):
            import great_expectations as gx
            context = gx.get_context()
            result = context.run_checkpoint(checkpoint_name=checkpoint_name)
            if not result.success:
                raise ValueError(f"Quality check failed: {checkpoint_name}")

        check_orders = PythonOperator(
            task_id='check_orders_quality',
            python_callable=run_great_expectations,
            op_kwargs={'checkpoint_name': 'bronze_orders_checkpoint'},
        )

        check_customers = PythonOperator(
            task_id='check_customers_quality',
            python_callable=run_great_expectations,
            op_kwargs={'checkpoint_name': 'bronze_customers_checkpoint'},
        )

    # ============================================
    # Transform: Create Silver layer with Spark
    # ============================================
    with TaskGroup(group_id='transform_spark') as transform_spark:

        process_orders = SparkSubmitOperator(
            task_id='process_orders',
            application='/opt/spark/jobs/process_orders.py',
            application_args=[
                '--input', 's3://data-lake/bronze/orders/{{ ds }}/',
                '--output', 's3://data-lake/silver/orders/{{ ds }}/'
            ],
        )

        aggregate_daily = SparkSubmitOperator(
            task_id='aggregate_daily',
            application='/opt/spark/jobs/aggregate_daily.py',
            application_args=[
                '--date', '{{ ds }}',
                '--output', 's3://data-lake/silver/daily_aggregates/{{ ds }}/'
            ],
        )

    # ============================================
    # Transform: Create Gold layer with dbt
    # ============================================
    with TaskGroup(group_id='transform_dbt') as transform_dbt:

        def run_dbt_command(command: str, **kwargs):
            import subprocess
            result = subprocess.run(
                f"cd /opt/dbt && dbt {command} --profiles-dir /opt/dbt",
                shell=True,
                capture_output=True,
                text=True
            )
            if result.returncode != 0:
                raise Exception(f"dbt failed: {result.stderr}")
            print(result.stdout)

        dbt_run = PythonOperator(
            task_id='dbt_run',
            python_callable=run_dbt_command,
            op_kwargs={'command': 'run --select staging marts'},
        )

        dbt_test = PythonOperator(
            task_id='dbt_test',
            python_callable=run_dbt_command,
            op_kwargs={'command': 'test'},
        )

        dbt_run >> dbt_test

    # ============================================
    # Quality Check: Gold layer quality validation
    # ============================================
    quality_gold = PythonOperator(
        task_id='quality_gold',
        python_callable=run_great_expectations,
        op_kwargs={'checkpoint_name': 'gold_checkpoint'},
    )

    # ============================================
    # Notify: Completion notification
    # ============================================
    def send_completion_notification(**kwargs):
        import requests
        webhook_url = "https://hooks.slack.com/services/xxx"
        message = {
            "text": f"Daily pipeline completed for {kwargs['ds']}"
        }
        requests.post(webhook_url, json=message)

    notify = PythonOperator(
        task_id='notify_completion',
        python_callable=send_completion_notification,
    )

    end = EmptyOperator(task_id='end')

    # Task dependencies
    start >> extract_group >> quality_bronze >> transform_spark >> transform_dbt >> quality_gold >> notify >> end

4. Spark Processing Jobs

4.1 Data Extraction Job

# spark/jobs/extract_postgres.py
from pyspark.sql import SparkSession
import argparse

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--table', required=True)
    parser.add_argument('--date', required=False)
    parser.add_argument('--output', required=True)
    args = parser.parse_args()

    spark = SparkSession.builder \
        .appName(f"Extract {args.table}") \
        .getOrCreate()

    # JDBC read configuration
    jdbc_url = "jdbc:postgresql://postgres:5432/ecommerce"
    properties = {
        "user": "postgres",
        "password": "password",
        "driver": "org.postgresql.Driver"
    }

    # Incremental extraction (when date specified)
    if args.date:
        query = f"""
            (SELECT * FROM {args.table}
             WHERE DATE(updated_at) = '{args.date}') AS t
        """
    else:
        query = args.table

    # Read data
    df = spark.read.jdbc(
        url=jdbc_url,
        table=query,
        properties=properties
    )

    # Save to Bronze layer
    df.write \
        .mode("overwrite") \
        .parquet(args.output)

    print(f"Extracted {df.count()} rows from {args.table}")
    spark.stop()


if __name__ == "__main__":
    main()

4.2 Clickstream Processing

# spark/jobs/process_clickstream.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import argparse

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--date', required=True)
    parser.add_argument('--input', required=True)
    parser.add_argument('--output', required=True)
    args = parser.parse_args()

    spark = SparkSession.builder \
        .appName("Process Clickstream") \
        .config("spark.sql.adaptive.enabled", "true") \
        .getOrCreate()

    # Define JSON schema
    schema = StructType([
        StructField("event_id", StringType()),
        StructField("user_id", StringType()),
        StructField("session_id", StringType()),
        StructField("event_type", StringType()),
        StructField("page_url", StringType()),
        StructField("timestamp", TimestampType()),
        StructField("properties", MapType(StringType(), StringType())),
    ])

    # Read JSON
    df = spark.read.schema(schema).json(args.input)

    # Clean and transform
    processed_df = df \
        .filter(col("event_id").isNotNull()) \
        .filter(col("user_id").isNotNull()) \
        .withColumn("event_date", to_date(col("timestamp"))) \
        .withColumn("event_hour", hour(col("timestamp"))) \
        .withColumn("product_id", col("properties").getItem("product_id")) \
        .dropDuplicates(["event_id"]) \
        .select(
            "event_id",
            "user_id",
            "session_id",
            "event_type",
            "page_url",
            "product_id",
            "event_date",
            "event_hour",
            "timestamp"
        )

    # Save partitioned
    processed_df.write \
        .mode("overwrite") \
        .partitionBy("event_date", "event_hour") \
        .parquet(args.output)

    print(f"Processed {processed_df.count()} events")
    spark.stop()


if __name__ == "__main__":
    main()

4.3 Daily Aggregation

# spark/jobs/aggregate_daily.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import argparse

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--date', required=True)
    parser.add_argument('--output', required=True)
    args = parser.parse_args()

    spark = SparkSession.builder \
        .appName("Daily Aggregation") \
        .getOrCreate()

    # Read Silver layer
    orders = spark.read.parquet(f"s3://data-lake/silver/orders/{args.date}/")
    customers = spark.read.parquet("s3://data-lake/silver/customers/")
    products = spark.read.parquet("s3://data-lake/silver/products/")

    # Daily sales aggregation
    daily_sales = orders \
        .filter(col("order_date") == args.date) \
        .join(products, "product_id") \
        .groupBy(
            col("order_date"),
            col("category"),
            col("region")
        ) \
        .agg(
            count("order_id").alias("order_count"),
            sum("amount").alias("total_revenue"),
            avg("amount").alias("avg_order_value"),
            countDistinct("customer_id").alias("unique_customers")
        )

    # Customer segment aggregation
    customer_segments = orders \
        .filter(col("order_date") == args.date) \
        .join(customers, "customer_id") \
        .groupBy("customer_segment") \
        .agg(
            count("order_id").alias("orders"),
            sum("amount").alias("revenue")
        )

    # Save
    daily_sales.write \
        .mode("overwrite") \
        .parquet(f"{args.output}/daily_sales/")

    customer_segments.write \
        .mode("overwrite") \
        .parquet(f"{args.output}/customer_segments/")

    spark.stop()


if __name__ == "__main__":
    main()

5. dbt Models

5.1 Staging Models

-- dbt/models/staging/stg_orders.sql
{{
    config(
        materialized='view',
        schema='staging'
    )
}}

WITH source AS (
    SELECT * FROM {{ source('silver', 'orders') }}
),

cleaned AS (
    SELECT
        order_id,
        customer_id,
        product_id,
        CAST(order_date AS DATE) AS order_date,
        CAST(amount AS DECIMAL(12, 2)) AS amount,
        CAST(quantity AS INT) AS quantity,
        status,
        CURRENT_TIMESTAMP AS loaded_at
    FROM source
    WHERE order_id IS NOT NULL
      AND amount > 0
)

SELECT * FROM cleaned

5.2 Mart Models

-- dbt/models/marts/fct_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        schema='marts',
        partition_by={
            'field': 'order_date',
            'data_type': 'date',
            'granularity': 'day'
        }
    )
}}

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),

customers AS (
    SELECT * FROM {{ ref('dim_customers') }}
),

products AS (
    SELECT * FROM {{ ref('dim_products') }}
)

SELECT
    o.order_id,
    o.order_date,

    -- Customer information
    o.customer_id,
    c.customer_name,
    c.customer_segment,
    c.region,

    -- Product information
    o.product_id,
    p.product_name,
    p.category,

    -- Metrics
    o.quantity,
    o.amount AS order_amount,
    p.unit_cost * o.quantity AS cost_amount,
    o.amount - (p.unit_cost * o.quantity) AS profit_amount,

    -- Status
    o.status,

    -- Metadata
    o.loaded_at

FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id
LEFT JOIN products p ON o.product_id = p.product_id

{% if is_incremental() %}
WHERE o.order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}
-- dbt/models/marts/agg_daily_sales.sql
{{
    config(
        materialized='table',
        schema='marts'
    )
}}

WITH orders AS (
    SELECT * FROM {{ ref('fct_orders') }}
)

SELECT
    order_date,
    category,
    region,
    customer_segment,

    -- Order metrics
    COUNT(DISTINCT order_id) AS total_orders,
    COUNT(DISTINCT customer_id) AS unique_customers,
    SUM(quantity) AS total_quantity,

    -- Revenue metrics
    SUM(order_amount) AS total_revenue,
    AVG(order_amount) AS avg_order_value,
    SUM(profit_amount) AS total_profit,

    -- Profit margin
    ROUND(SUM(profit_amount) / NULLIF(SUM(order_amount), 0) * 100, 2) AS profit_margin_pct,

    -- Period comparison (with dbt_utils)
    -- {{ dbt_utils.date_spine(...) }}

    CURRENT_TIMESTAMP AS updated_at

FROM orders
GROUP BY
    order_date,
    category,
    region,
    customer_segment

6. Quality Validation

6.1 Great Expectations Suite

# great_expectations/create_expectations.py
import great_expectations as gx

context = gx.get_context()

# Orders Suite
orders_suite = context.add_expectation_suite("orders_suite")
validator = context.get_validator(
    batch_request={"datasource": "orders_datasource", ...},
    expectation_suite_name="orders_suite"
)

# Basic validation
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_not_be_null("amount")

# Value range
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=1000000)
validator.expect_column_values_to_be_between("quantity", min_value=1, max_value=100)

# Accepted values
validator.expect_column_values_to_be_in_set(
    "status",
    ["pending", "processing", "shipped", "delivered", "cancelled"]
)

# Table level
validator.expect_table_row_count_to_be_between(min_value=1000, max_value=10000000)

# Referential integrity
# validator.expect_column_values_to_be_in_set(
#     "customer_id",
#     customer_ids_from_dim_table
# )

validator.save_expectation_suite(discard_failed_expectations=False)

6.2 dbt Tests

# dbt/models/marts/_schema.yml
version: 2

models:
  - name: fct_orders
    description: "Orders fact table"
    tests:
      - dbt_utils.recency:
          datepart: day
          field: order_date
          interval: 1
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id
      - name: order_amount
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: ">= 0"
      - name: profit_amount
        tests:
          - dbt_utils.expression_is_true:
              expression: "<= order_amount"

  - name: agg_daily_sales
    tests:
      - dbt_utils.unique_combination_of_columns:
          combination_of_columns:
            - order_date
            - category
            - region
            - customer_segment

7. Monitoring and Alerts

7.1 Monitoring Dashboard

# monitoring/metrics_collector.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import json

@dataclass
class PipelineMetrics:
    """Pipeline metrics"""
    pipeline_name: str
    run_date: str
    start_time: datetime
    end_time: Optional[datetime] = None
    status: str = "running"
    records_processed: int = 0
    quality_score: float = 0.0
    errors: list = None

    def to_dict(self):
        return {
            "pipeline_name": self.pipeline_name,
            "run_date": self.run_date,
            "start_time": self.start_time.isoformat(),
            "end_time": self.end_time.isoformat() if self.end_time else None,
            "duration_seconds": (self.end_time - self.start_time).seconds if self.end_time else None,
            "status": self.status,
            "records_processed": self.records_processed,
            "quality_score": self.quality_score,
            "errors": self.errors or []
        }


def push_metrics_to_prometheus(metrics: PipelineMetrics):
    """Push metrics to Prometheus"""
    from prometheus_client import CollectorRegistry, Gauge, push_to_gateway

    registry = CollectorRegistry()

    duration = Gauge(
        'pipeline_duration_seconds',
        'Pipeline duration',
        ['pipeline_name'],
        registry=registry
    )
    duration.labels(pipeline_name=metrics.pipeline_name).set(
        (metrics.end_time - metrics.start_time).seconds if metrics.end_time else 0
    )

    records = Gauge(
        'pipeline_records_processed',
        'Records processed',
        ['pipeline_name'],
        registry=registry
    )
    records.labels(pipeline_name=metrics.pipeline_name).set(metrics.records_processed)

    quality = Gauge(
        'pipeline_quality_score',
        'Quality score',
        ['pipeline_name'],
        registry=registry
    )
    quality.labels(pipeline_name=metrics.pipeline_name).set(metrics.quality_score)

    push_to_gateway('localhost:9091', job='data_pipeline', registry=registry)

7.2 Alert Configuration

# monitoring/alerts.py
import requests
from typing import Optional

class AlertManager:
    """Alert management"""

    def __init__(self, slack_webhook: str, pagerduty_key: Optional[str] = None):
        self.slack_webhook = slack_webhook
        self.pagerduty_key = pagerduty_key

    def send_slack_alert(self, message: str, severity: str = "info"):
        """Slack notification"""
        color = {
            "info": "#36a64f",
            "warning": "#ffa500",
            "error": "#ff0000"
        }.get(severity, "#36a64f")

        payload = {
            "attachments": [{
                "color": color,
                "text": message,
                "footer": "Data Pipeline Alert"
            }]
        }
        requests.post(self.slack_webhook, json=payload)

    def send_pagerduty_alert(self, message: str):
        """PagerDuty notification (for critical issues)"""
        if not self.pagerduty_key:
            return

        payload = {
            "routing_key": self.pagerduty_key,
            "event_action": "trigger",
            "payload": {
                "summary": message,
                "severity": "critical",
                "source": "data-pipeline"
            }
        }
        requests.post(
            "https://events.pagerduty.com/v2/enqueue",
            json=payload
        )


# Use in Airflow
def alert_on_failure(context):
    """Alert on task failure"""
    alert_manager = AlertManager(
        slack_webhook="https://hooks.slack.com/services/xxx"
    )

    message = f"""
    Pipeline Failed!
    DAG: {context['dag'].dag_id}
    Task: {context['task'].task_id}
    Execution Date: {context['ds']}
    Error: {context.get('exception', 'Unknown')}
    """

    alert_manager.send_slack_alert(message, severity="error")

8. Deployment

8.1 Docker Compose

# docker/docker-compose.yml
version: '3.8'

services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres_data:/var/lib/postgresql/data

  airflow-webserver:
    image: apache/airflow:2.7.0
    depends_on:
      - postgres
    environment:
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    volumes:
      - ./airflow/dags:/opt/airflow/dags
      - ./dbt:/opt/dbt
    ports:
      - "8080:8080"
    command: webserver

  airflow-scheduler:
    image: apache/airflow:2.7.0
    depends_on:
      - airflow-webserver
    volumes:
      - ./airflow/dags:/opt/airflow/dags
    command: scheduler

  spark-master:
    image: bitnami/spark:3.4
    environment:
      - SPARK_MODE=master
    ports:
      - "7077:7077"
      - "8081:8080"
    volumes:
      - ./spark/jobs:/opt/spark/jobs

  spark-worker:
    image: bitnami/spark:3.4
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
    depends_on:
      - spark-master

volumes:
  postgres_data:

Practice Problems

Problem 1: Extend Pipeline

Add a streaming pipeline that processes real-time inventory events from Kafka and sends low-stock alerts.

Problem 2: Quality Dashboard

Visualize daily data quality scores in a Grafana dashboard.

Problem 3: Cost Optimization

Optimize Spark partition count and resource settings for large-scale data processing.


Summary

Key integrations covered in this project:

Tool Role
Airflow Pipeline orchestration
Spark Large-scale data processing
dbt SQL-based transformation
Great Expectations Data quality validation
Data Lake Layered storage

References

to navigate between lessons