Data Quality and Governance

Data Quality and Governance

Overview

Data quality ensures the accuracy, completeness, and consistency of data, while data governance is a framework for systematically managing data assets. Both are essential for building trustworthy data pipelines.


1. Data Quality Dimensions

1.1 Quality Dimension Definitions

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                   6 Dimensions of Data Quality                  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                β”‚
β”‚   1. Accuracy                                                  β”‚
β”‚      - Does the data correctly reflect the actual value?       β”‚
β”‚      - Example: Is the customer email in a valid format?       β”‚
β”‚                                                                β”‚
β”‚   2. Completeness                                              β”‚
β”‚      - Is all necessary data present?                          β”‚
β”‚      - Example: Are there no NULLs in required fields?         β”‚
β”‚                                                                β”‚
β”‚   3. Consistency                                               β”‚
β”‚      - Is the data consistent across systems?                  β”‚
β”‚      - Example: Do order counts match between order table      β”‚
β”‚        and aggregate table?                                    β”‚
β”‚                                                                β”‚
β”‚   4. Timeliness                                                β”‚
β”‚      - Is data provided within an appropriate timeframe?       β”‚
β”‚      - Example: Is the real-time dashboard updated within      β”‚
β”‚        5 minutes?                                              β”‚
β”‚                                                                β”‚
β”‚   5. Uniqueness                                                β”‚
β”‚      - Is there no duplicate data?                             β”‚
β”‚      - Example: Is the same order not recorded multiple times? β”‚
β”‚                                                                β”‚
β”‚   6. Validity                                                  β”‚
β”‚      - Does the data comply with defined rules?                β”‚
β”‚      - Example: Is the date in the correct format?             β”‚
β”‚                                                                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1.2 Quality Metrics Example

from dataclasses import dataclass
from typing import Optional
import pandas as pd

@dataclass
class DataQualityMetrics:
    """Data quality metrics"""
    table_name: str
    row_count: int
    null_count: dict[str, int]
    duplicate_count: int
    freshness_hours: float
    schema_valid: bool

def calculate_quality_metrics(df: pd.DataFrame, table_name: str) -> DataQualityMetrics:
    """Calculate quality metrics"""

    # Completeness: NULL count
    null_count = {col: df[col].isna().sum() for col in df.columns}

    # Uniqueness: Duplicate count
    duplicate_count = df.duplicated().sum()

    return DataQualityMetrics(
        table_name=table_name,
        row_count=len(df),
        null_count=null_count,
        duplicate_count=duplicate_count,
        freshness_hours=0,  # Requires separate calculation
        schema_valid=True    # Requires separate validation
    )


def quality_score(metrics: DataQualityMetrics) -> float:
    """Calculate 0-100 quality score"""
    scores = []

    # Completeness score (NULL ratio)
    total_cells = metrics.row_count * len(metrics.null_count)
    total_nulls = sum(metrics.null_count.values())
    completeness = (1 - total_nulls / total_cells) * 100 if total_cells > 0 else 100
    scores.append(completeness)

    # Uniqueness score (duplicate ratio)
    uniqueness = (1 - metrics.duplicate_count / metrics.row_count) * 100 if metrics.row_count > 0 else 100
    scores.append(uniqueness)

    return sum(scores) / len(scores)

2. Great Expectations

2.1 Installation and Initialization

# Installation
pip install great_expectations

# Project initialization
great_expectations init

2.2 Basic Usage

import great_expectations as gx
import pandas as pd

# Create Context
context = gx.get_context()

# Add data source
datasource = context.sources.add_pandas("my_datasource")

# Define data asset
data_asset = datasource.add_dataframe_asset(name="orders")

# Load DataFrame
df = pd.read_csv("orders.csv")

# Batch Request
batch_request = data_asset.build_batch_request(dataframe=df)

# Create Expectation Suite
suite = context.add_expectation_suite("orders_suite")

# Create Validator
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="orders_suite"
)

2.3 Define Expectations

# Basic Expectations

# No NULLs
validator.expect_column_values_to_not_be_null("order_id")

# Unique
validator.expect_column_values_to_be_unique("order_id")

# Value range
validator.expect_column_values_to_be_between(
    "amount",
    min_value=0,
    max_value=1000000
)

# Allowed value list
validator.expect_column_values_to_be_in_set(
    "status",
    ["pending", "completed", "cancelled", "refunded"]
)

# Regex matching
validator.expect_column_values_to_match_regex(
    "email",
    r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
)

# Table row count
validator.expect_table_row_count_to_be_between(
    min_value=1000,
    max_value=1000000
)

# Column existence
validator.expect_table_columns_to_match_set(
    ["order_id", "customer_id", "amount", "status", "order_date"]
)

# Date format
validator.expect_column_values_to_match_strftime_format(
    "order_date",
    "%Y-%m-%d"
)

# Referential integrity (other table)
validator.expect_column_values_to_be_in_set(
    "customer_id",
    customer_ids_list  # List of IDs from customer table
)

# Save Suite
validator.save_expectation_suite(discard_failed_expectations=False)

2.4 Run Validation

# Create and run Checkpoint
checkpoint = context.add_or_update_checkpoint(
    name="orders_checkpoint",
    validations=[
        {
            "batch_request": batch_request,
            "expectation_suite_name": "orders_suite"
        }
    ]
)

# Run validation
result = checkpoint.run()

# Check results
print(f"Success: {result.success}")
print(f"Statistics: {result.statistics}")

# Check failed Expectations
for validation_result in result.list_validation_results():
    for exp_result in validation_result.results:
        if not exp_result.success:
            print(f"Failed: {exp_result.expectation_config.expectation_type}")
            print(f"  Column: {exp_result.expectation_config.kwargs.get('column')}")
            print(f"  Result: {exp_result.result}")

2.5 Generate Data Docs

# Build and open Data Docs
context.build_data_docs()
context.open_data_docs()

3. Airflow Integration

3.1 Great Expectations Operator

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import great_expectations as gx

def validate_data(**kwargs):
    """Great Expectations validation Task"""
    context = gx.get_context()

    # Run Checkpoint
    result = context.run_checkpoint(
        checkpoint_name="orders_checkpoint"
    )

    if not result.success:
        raise ValueError("Data quality check failed!")

    return result.statistics


with DAG(
    'data_quality_dag',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
) as dag:

    validate = PythonOperator(
        task_id='validate_orders',
        python_callable=validate_data,
    )

3.2 Custom Quality Checks

from airflow.operators.python import PythonOperator, BranchPythonOperator

def check_row_count(**kwargs):
    """Row count validation"""
    import pandas as pd

    df = pd.read_parquet(f"/data/{kwargs['ds']}/orders.parquet")
    row_count = len(df)

    # Store metric in XCom
    kwargs['ti'].xcom_push(key='row_count', value=row_count)

    if row_count < 1000:
        raise ValueError(f"Row count too low: {row_count}")

    return row_count


def check_freshness(**kwargs):
    """Data freshness validation"""
    from datetime import datetime, timedelta

    # Check file modification time
    import os
    file_path = f"/data/{kwargs['ds']}/orders.parquet"
    mtime = datetime.fromtimestamp(os.path.getmtime(file_path))

    age_hours = (datetime.now() - mtime).total_seconds() / 3600

    if age_hours > 24:
        raise ValueError(f"Data too old: {age_hours:.1f} hours")

    return age_hours


def decide_next_step(**kwargs):
    """Branching based on quality results"""
    ti = kwargs['ti']
    row_count = ti.xcom_pull(task_ids='check_row_count', key='row_count')

    if row_count > 10000:
        return 'process_large_batch'
    else:
        return 'process_small_batch'


with DAG('quality_checks_dag', ...) as dag:

    check_rows = PythonOperator(
        task_id='check_row_count',
        python_callable=check_row_count,
    )

    check_fresh = PythonOperator(
        task_id='check_freshness',
        python_callable=check_freshness,
    )

    branch = BranchPythonOperator(
        task_id='decide_processing',
        python_callable=decide_next_step,
    )

    [check_rows, check_fresh] >> branch

4. Data Catalog

4.1 Catalog Concept

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Data Catalog                                 β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                β”‚
β”‚   Metadata Management System:                                  β”‚
β”‚                                                                β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚   β”‚  Technical Metadata                                     β”‚  β”‚
β”‚   β”‚  - Schema, data types, partitions                       β”‚  β”‚
β”‚   β”‚  - Location, format, size                               β”‚  β”‚
β”‚   β”‚  - Creation date, modification date                     β”‚  β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚                                                                β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚   β”‚  Business Metadata                                      β”‚  β”‚
β”‚   β”‚  - Description, definition, terminology                 β”‚  β”‚
β”‚   β”‚  - Owner, administrator                                 β”‚  β”‚
β”‚   β”‚  - Tags, classification                                 β”‚  β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚                                                                β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚   β”‚  Operational Metadata                                   β”‚  β”‚
β”‚   β”‚  - Usage frequency, query patterns                      β”‚  β”‚
β”‚   β”‚  - Quality score, issues                                β”‚  β”‚
β”‚   β”‚  - Access permissions                                   β”‚  β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚                                                                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

4.2 Catalog Tools

Tool Type Features
DataHub Open Source Developed by LinkedIn, general purpose
Apache Atlas Open Source Hadoop ecosystem
Amundsen Open Source Developed by Lyft, search-focused
OpenMetadata Open Source All-in-one platform
Atlan Commercial Collaboration-focused
Alation Commercial Enterprise

4.3 DataHub Example

# DataHub metadata collection example
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
    DatasetPropertiesClass,
    SchemaMetadataClass,
    SchemaFieldClass,
    StringTypeClass,
    NumberTypeClass,
)

# Create Emitter
emitter = DatahubRestEmitter(gms_server="http://localhost:8080")

# Dataset URN
dataset_urn = make_dataset_urn(
    platform="postgres",
    name="analytics.public.fact_orders",
    env="PROD"
)

# Dataset properties
properties = DatasetPropertiesClass(
    description="Orders fact table",
    customProperties={
        "owner": "data-team@company.com",
        "sla": "daily",
        "pii": "false"
    }
)

# Schema definition
schema = SchemaMetadataClass(
    schemaName="fact_orders",
    platform=f"urn:li:dataPlatform:postgres",
    fields=[
        SchemaFieldClass(
            fieldPath="order_id",
            type=StringTypeClass(),
            description="Unique order ID"
        ),
        SchemaFieldClass(
            fieldPath="amount",
            type=NumberTypeClass(),
            description="Order amount"
        ),
    ]
)

# Emit metadata
emitter.emit_mce(properties)
emitter.emit_mce(schema)

5. Data Lineage

5.1 Lineage Concept

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                     Data Lineage                                β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                β”‚
β”‚   Track data origin and transformation process:                β”‚
β”‚                                                                β”‚
β”‚   Raw Sources          Staging           Marts                 β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”‚
β”‚   β”‚ orders   │───────→│stg_orders│─────→│fct_ordersβ”‚          β”‚
β”‚   β”‚ (raw)    β”‚        β”‚          β”‚      β”‚          β”‚          β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜          β”‚
β”‚                                               β”‚                β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”‚                β”‚
β”‚   β”‚customers │───────→│stg_customers│────────→│                β”‚
β”‚   β”‚ (raw)    β”‚        β”‚          β”‚           β”‚                β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           β”‚                β”‚
β”‚                                               ↓                β”‚
β”‚                                         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”‚
β”‚                                         β”‚ dashboardβ”‚          β”‚
β”‚                                         β”‚ (BI)     β”‚          β”‚
β”‚                                         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜          β”‚
β”‚                                                                β”‚
β”‚   Uses:                                                        β”‚
β”‚   - Impact analysis: Identify affected targets when source     β”‚
β”‚     changes                                                    β”‚
β”‚   - Root cause analysis: Track source of data issues           β”‚
β”‚   - Compliance: Audit data flows                               β”‚
β”‚                                                                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

5.2 dbt Lineage

# Generate dbt lineage
dbt docs generate

# View lineage (docs server)
dbt docs serve
# dbt model metadata
version: 2

models:
  - name: fct_orders
    description: "Orders fact table"
    meta:
      owner: "data-team"
      upstream:
        - stg_orders
        - stg_customers
      downstream:
        - sales_dashboard
        - ml_model_features

5.3 OpenLineage

# Lineage tracking using OpenLineage
from openlineage.client import OpenLineageClient
from openlineage.client.run import Run, Job, RunEvent, RunState
from openlineage.client.facet import (
    SqlJobFacet,
    SchemaDatasetFacet,
    SchemaField,
)
from datetime import datetime
import uuid

client = OpenLineageClient(url="http://localhost:5000")

# Job definition
job = Job(
    namespace="my_pipeline",
    name="transform_orders"
)

# Start Run
run_id = str(uuid.uuid4())
run = Run(runId=run_id)

# Input datasets
input_datasets = [
    {
        "namespace": "postgres",
        "name": "raw.orders",
        "facets": {
            "schema": SchemaDatasetFacet(
                fields=[
                    SchemaField(name="order_id", type="string"),
                    SchemaField(name="amount", type="decimal"),
                ]
            )
        }
    }
]

# Output datasets
output_datasets = [
    {
        "namespace": "postgres",
        "name": "analytics.fct_orders",
    }
]

# Start event
client.emit(
    RunEvent(
        eventType=RunState.START,
        eventTime=datetime.now().isoformat(),
        run=run,
        job=job,
        inputs=input_datasets,
        outputs=output_datasets,
    )
)

# ... actual transformation work ...

# Complete event
client.emit(
    RunEvent(
        eventType=RunState.COMPLETE,
        eventTime=datetime.now().isoformat(),
        run=run,
        job=job,
    )
)

6. Governance Framework

6.1 Data Governance Components

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                 Data Governance Framework                       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                β”‚
β”‚   1. Organization                                              β”‚
β”‚      - Designate data stewards                                 β”‚
β”‚      - Define roles and responsibilities                       β”‚
β”‚      - Governance committee                                    β”‚
β”‚                                                                β”‚
β”‚   2. Policies                                                  β”‚
β”‚      - Data classification policy                              β”‚
β”‚      - Access control policy                                   β”‚
β”‚      - Retention/deletion policy                               β”‚
β”‚      - Quality standards                                       β”‚
β”‚                                                                β”‚
β”‚   3. Processes                                                 β”‚
β”‚      - Data request/approval process                           β”‚
β”‚      - Issue management process                                β”‚
β”‚      - Change management process                               β”‚
β”‚                                                                β”‚
β”‚   4. Technology                                                β”‚
β”‚      - Data catalog                                            β”‚
β”‚      - Quality monitoring                                      β”‚
β”‚      - Access control systems                                  β”‚
β”‚      - Audit logs                                              β”‚
β”‚                                                                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

6.2 Data Classification

from enum import Enum

class DataClassification(Enum):
    """Data sensitivity classification"""
    PUBLIC = "public"           # Publicly available
    INTERNAL = "internal"       # Internal use
    CONFIDENTIAL = "confidential"  # Confidential
    RESTRICTED = "restricted"   # Restricted (PII, financial)

class DataClassifier:
    """Automatic data classification"""

    PII_PATTERNS = {
        'email': r'[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+',
        'phone': r'\d{3}-\d{3,4}-\d{4}',
        'ssn': r'\d{3}-\d{2}-\d{4}',
        'credit_card': r'\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}',
    }

    PII_COLUMN_NAMES = [
        'email', 'phone', 'ssn', 'social_security',
        'credit_card', 'password', 'address'
    ]

    @classmethod
    def classify_column(cls, column_name: str, sample_values: list) -> DataClassification:
        """Classify column"""
        column_lower = column_name.lower()

        # Column name-based classification
        if any(pii in column_lower for pii in cls.PII_COLUMN_NAMES):
            return DataClassification.RESTRICTED

        # Value pattern-based classification
        import re
        for value in sample_values[:100]:  # Sampling
            if value is None:
                continue
            for pii_type, pattern in cls.PII_PATTERNS.items():
                if re.match(pattern, str(value)):
                    return DataClassification.RESTRICTED

        return DataClassification.INTERNAL

Practice Problems

Problem 1: Great Expectations

Write an Expectation Suite for order data (NULL check, unique, value range, referential integrity).

Problem 2: Quality Dashboard

Design a pipeline that calculates and visualizes daily data quality scores.

Problem 3: Lineage Tracking

Design a system that automatically tracks lineage in an ETL pipeline.


Summary

Concept Description
Data Quality Ensuring accuracy, completeness, consistency, timeliness
Great Expectations Python-based data quality framework
Data Catalog Metadata management system
Data Lineage Tracking data origin and transformations
Data Governance Systematic management of data assets

References

to navigate between lessons