Data Quality and Governance
Data Quality and Governance¶
Overview¶
Data quality ensures the accuracy, completeness, and consistency of data, while data governance is a framework for systematically managing data assets. Both are essential for building trustworthy data pipelines.
1. Data Quality Dimensions¶
1.1 Quality Dimension Definitions¶
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 6 Dimensions of Data Quality β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β 1. Accuracy β
β - Does the data correctly reflect the actual value? β
β - Example: Is the customer email in a valid format? β
β β
β 2. Completeness β
β - Is all necessary data present? β
β - Example: Are there no NULLs in required fields? β
β β
β 3. Consistency β
β - Is the data consistent across systems? β
β - Example: Do order counts match between order table β
β and aggregate table? β
β β
β 4. Timeliness β
β - Is data provided within an appropriate timeframe? β
β - Example: Is the real-time dashboard updated within β
β 5 minutes? β
β β
β 5. Uniqueness β
β - Is there no duplicate data? β
β - Example: Is the same order not recorded multiple times? β
β β
β 6. Validity β
β - Does the data comply with defined rules? β
β - Example: Is the date in the correct format? β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1.2 Quality Metrics Example¶
from dataclasses import dataclass
from typing import Optional
import pandas as pd
@dataclass
class DataQualityMetrics:
"""Data quality metrics"""
table_name: str
row_count: int
null_count: dict[str, int]
duplicate_count: int
freshness_hours: float
schema_valid: bool
def calculate_quality_metrics(df: pd.DataFrame, table_name: str) -> DataQualityMetrics:
"""Calculate quality metrics"""
# Completeness: NULL count
null_count = {col: df[col].isna().sum() for col in df.columns}
# Uniqueness: Duplicate count
duplicate_count = df.duplicated().sum()
return DataQualityMetrics(
table_name=table_name,
row_count=len(df),
null_count=null_count,
duplicate_count=duplicate_count,
freshness_hours=0, # Requires separate calculation
schema_valid=True # Requires separate validation
)
def quality_score(metrics: DataQualityMetrics) -> float:
"""Calculate 0-100 quality score"""
scores = []
# Completeness score (NULL ratio)
total_cells = metrics.row_count * len(metrics.null_count)
total_nulls = sum(metrics.null_count.values())
completeness = (1 - total_nulls / total_cells) * 100 if total_cells > 0 else 100
scores.append(completeness)
# Uniqueness score (duplicate ratio)
uniqueness = (1 - metrics.duplicate_count / metrics.row_count) * 100 if metrics.row_count > 0 else 100
scores.append(uniqueness)
return sum(scores) / len(scores)
2. Great Expectations¶
2.1 Installation and Initialization¶
# Installation
pip install great_expectations
# Project initialization
great_expectations init
2.2 Basic Usage¶
import great_expectations as gx
import pandas as pd
# Create Context
context = gx.get_context()
# Add data source
datasource = context.sources.add_pandas("my_datasource")
# Define data asset
data_asset = datasource.add_dataframe_asset(name="orders")
# Load DataFrame
df = pd.read_csv("orders.csv")
# Batch Request
batch_request = data_asset.build_batch_request(dataframe=df)
# Create Expectation Suite
suite = context.add_expectation_suite("orders_suite")
# Create Validator
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="orders_suite"
)
2.3 Define Expectations¶
# Basic Expectations
# No NULLs
validator.expect_column_values_to_not_be_null("order_id")
# Unique
validator.expect_column_values_to_be_unique("order_id")
# Value range
validator.expect_column_values_to_be_between(
"amount",
min_value=0,
max_value=1000000
)
# Allowed value list
validator.expect_column_values_to_be_in_set(
"status",
["pending", "completed", "cancelled", "refunded"]
)
# Regex matching
validator.expect_column_values_to_match_regex(
"email",
r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
)
# Table row count
validator.expect_table_row_count_to_be_between(
min_value=1000,
max_value=1000000
)
# Column existence
validator.expect_table_columns_to_match_set(
["order_id", "customer_id", "amount", "status", "order_date"]
)
# Date format
validator.expect_column_values_to_match_strftime_format(
"order_date",
"%Y-%m-%d"
)
# Referential integrity (other table)
validator.expect_column_values_to_be_in_set(
"customer_id",
customer_ids_list # List of IDs from customer table
)
# Save Suite
validator.save_expectation_suite(discard_failed_expectations=False)
2.4 Run Validation¶
# Create and run Checkpoint
checkpoint = context.add_or_update_checkpoint(
name="orders_checkpoint",
validations=[
{
"batch_request": batch_request,
"expectation_suite_name": "orders_suite"
}
]
)
# Run validation
result = checkpoint.run()
# Check results
print(f"Success: {result.success}")
print(f"Statistics: {result.statistics}")
# Check failed Expectations
for validation_result in result.list_validation_results():
for exp_result in validation_result.results:
if not exp_result.success:
print(f"Failed: {exp_result.expectation_config.expectation_type}")
print(f" Column: {exp_result.expectation_config.kwargs.get('column')}")
print(f" Result: {exp_result.result}")
2.5 Generate Data Docs¶
# Build and open Data Docs
context.build_data_docs()
context.open_data_docs()
3. Airflow Integration¶
3.1 Great Expectations Operator¶
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import great_expectations as gx
def validate_data(**kwargs):
"""Great Expectations validation Task"""
context = gx.get_context()
# Run Checkpoint
result = context.run_checkpoint(
checkpoint_name="orders_checkpoint"
)
if not result.success:
raise ValueError("Data quality check failed!")
return result.statistics
with DAG(
'data_quality_dag',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
) as dag:
validate = PythonOperator(
task_id='validate_orders',
python_callable=validate_data,
)
3.2 Custom Quality Checks¶
from airflow.operators.python import PythonOperator, BranchPythonOperator
def check_row_count(**kwargs):
"""Row count validation"""
import pandas as pd
df = pd.read_parquet(f"/data/{kwargs['ds']}/orders.parquet")
row_count = len(df)
# Store metric in XCom
kwargs['ti'].xcom_push(key='row_count', value=row_count)
if row_count < 1000:
raise ValueError(f"Row count too low: {row_count}")
return row_count
def check_freshness(**kwargs):
"""Data freshness validation"""
from datetime import datetime, timedelta
# Check file modification time
import os
file_path = f"/data/{kwargs['ds']}/orders.parquet"
mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
age_hours = (datetime.now() - mtime).total_seconds() / 3600
if age_hours > 24:
raise ValueError(f"Data too old: {age_hours:.1f} hours")
return age_hours
def decide_next_step(**kwargs):
"""Branching based on quality results"""
ti = kwargs['ti']
row_count = ti.xcom_pull(task_ids='check_row_count', key='row_count')
if row_count > 10000:
return 'process_large_batch'
else:
return 'process_small_batch'
with DAG('quality_checks_dag', ...) as dag:
check_rows = PythonOperator(
task_id='check_row_count',
python_callable=check_row_count,
)
check_fresh = PythonOperator(
task_id='check_freshness',
python_callable=check_freshness,
)
branch = BranchPythonOperator(
task_id='decide_processing',
python_callable=decide_next_step,
)
[check_rows, check_fresh] >> branch
4. Data Catalog¶
4.1 Catalog Concept¶
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Data Catalog β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Metadata Management System: β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Technical Metadata β β
β β - Schema, data types, partitions β β
β β - Location, format, size β β
β β - Creation date, modification date β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Business Metadata β β
β β - Description, definition, terminology β β
β β - Owner, administrator β β
β β - Tags, classification β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Operational Metadata β β
β β - Usage frequency, query patterns β β
β β - Quality score, issues β β
β β - Access permissions β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
4.2 Catalog Tools¶
| Tool | Type | Features |
|---|---|---|
| DataHub | Open Source | Developed by LinkedIn, general purpose |
| Apache Atlas | Open Source | Hadoop ecosystem |
| Amundsen | Open Source | Developed by Lyft, search-focused |
| OpenMetadata | Open Source | All-in-one platform |
| Atlan | Commercial | Collaboration-focused |
| Alation | Commercial | Enterprise |
4.3 DataHub Example¶
# DataHub metadata collection example
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
DatasetPropertiesClass,
SchemaMetadataClass,
SchemaFieldClass,
StringTypeClass,
NumberTypeClass,
)
# Create Emitter
emitter = DatahubRestEmitter(gms_server="http://localhost:8080")
# Dataset URN
dataset_urn = make_dataset_urn(
platform="postgres",
name="analytics.public.fact_orders",
env="PROD"
)
# Dataset properties
properties = DatasetPropertiesClass(
description="Orders fact table",
customProperties={
"owner": "data-team@company.com",
"sla": "daily",
"pii": "false"
}
)
# Schema definition
schema = SchemaMetadataClass(
schemaName="fact_orders",
platform=f"urn:li:dataPlatform:postgres",
fields=[
SchemaFieldClass(
fieldPath="order_id",
type=StringTypeClass(),
description="Unique order ID"
),
SchemaFieldClass(
fieldPath="amount",
type=NumberTypeClass(),
description="Order amount"
),
]
)
# Emit metadata
emitter.emit_mce(properties)
emitter.emit_mce(schema)
5. Data Lineage¶
5.1 Lineage Concept¶
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Data Lineage β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Track data origin and transformation process: β
β β
β Raw Sources Staging Marts β
β ββββββββββββ ββββββββββββ ββββββββββββ β
β β orders ββββββββββstg_ordersββββββββfct_ordersβ β
β β (raw) β β β β β β
β ββββββββββββ ββββββββββββ ββββββ¬ββββββ β
β β β
β ββββββββββββ ββββββββββββ β β
β βcustomers ββββββββββstg_customersβββββββββββ β
β β (raw) β β β β β
β ββββββββββββ ββββββββββββ β β
β β β
β ββββββββββββ β
β β dashboardβ β
β β (BI) β β
β ββββββββββββ β
β β
β Uses: β
β - Impact analysis: Identify affected targets when source β
β changes β
β - Root cause analysis: Track source of data issues β
β - Compliance: Audit data flows β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
5.2 dbt Lineage¶
# Generate dbt lineage
dbt docs generate
# View lineage (docs server)
dbt docs serve
# dbt model metadata
version: 2
models:
- name: fct_orders
description: "Orders fact table"
meta:
owner: "data-team"
upstream:
- stg_orders
- stg_customers
downstream:
- sales_dashboard
- ml_model_features
5.3 OpenLineage¶
# Lineage tracking using OpenLineage
from openlineage.client import OpenLineageClient
from openlineage.client.run import Run, Job, RunEvent, RunState
from openlineage.client.facet import (
SqlJobFacet,
SchemaDatasetFacet,
SchemaField,
)
from datetime import datetime
import uuid
client = OpenLineageClient(url="http://localhost:5000")
# Job definition
job = Job(
namespace="my_pipeline",
name="transform_orders"
)
# Start Run
run_id = str(uuid.uuid4())
run = Run(runId=run_id)
# Input datasets
input_datasets = [
{
"namespace": "postgres",
"name": "raw.orders",
"facets": {
"schema": SchemaDatasetFacet(
fields=[
SchemaField(name="order_id", type="string"),
SchemaField(name="amount", type="decimal"),
]
)
}
}
]
# Output datasets
output_datasets = [
{
"namespace": "postgres",
"name": "analytics.fct_orders",
}
]
# Start event
client.emit(
RunEvent(
eventType=RunState.START,
eventTime=datetime.now().isoformat(),
run=run,
job=job,
inputs=input_datasets,
outputs=output_datasets,
)
)
# ... actual transformation work ...
# Complete event
client.emit(
RunEvent(
eventType=RunState.COMPLETE,
eventTime=datetime.now().isoformat(),
run=run,
job=job,
)
)
6. Governance Framework¶
6.1 Data Governance Components¶
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Data Governance Framework β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β 1. Organization β
β - Designate data stewards β
β - Define roles and responsibilities β
β - Governance committee β
β β
β 2. Policies β
β - Data classification policy β
β - Access control policy β
β - Retention/deletion policy β
β - Quality standards β
β β
β 3. Processes β
β - Data request/approval process β
β - Issue management process β
β - Change management process β
β β
β 4. Technology β
β - Data catalog β
β - Quality monitoring β
β - Access control systems β
β - Audit logs β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
6.2 Data Classification¶
from enum import Enum
class DataClassification(Enum):
"""Data sensitivity classification"""
PUBLIC = "public" # Publicly available
INTERNAL = "internal" # Internal use
CONFIDENTIAL = "confidential" # Confidential
RESTRICTED = "restricted" # Restricted (PII, financial)
class DataClassifier:
"""Automatic data classification"""
PII_PATTERNS = {
'email': r'[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+',
'phone': r'\d{3}-\d{3,4}-\d{4}',
'ssn': r'\d{3}-\d{2}-\d{4}',
'credit_card': r'\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}',
}
PII_COLUMN_NAMES = [
'email', 'phone', 'ssn', 'social_security',
'credit_card', 'password', 'address'
]
@classmethod
def classify_column(cls, column_name: str, sample_values: list) -> DataClassification:
"""Classify column"""
column_lower = column_name.lower()
# Column name-based classification
if any(pii in column_lower for pii in cls.PII_COLUMN_NAMES):
return DataClassification.RESTRICTED
# Value pattern-based classification
import re
for value in sample_values[:100]: # Sampling
if value is None:
continue
for pii_type, pattern in cls.PII_PATTERNS.items():
if re.match(pattern, str(value)):
return DataClassification.RESTRICTED
return DataClassification.INTERNAL
Practice Problems¶
Problem 1: Great Expectations¶
Write an Expectation Suite for order data (NULL check, unique, value range, referential integrity).
Problem 2: Quality Dashboard¶
Design a pipeline that calculates and visualizes daily data quality scores.
Problem 3: Lineage Tracking¶
Design a system that automatically tracks lineage in an ETL pipeline.
Summary¶
| Concept | Description |
|---|---|
| Data Quality | Ensuring accuracy, completeness, consistency, timeliness |
| Great Expectations | Python-based data quality framework |
| Data Catalog | Metadata management system |
| Data Lineage | Tracking data origin and transformations |
| Data Governance | Systematic management of data assets |