Data Engineering Overview
Data Engineering Overview¶
Introduction¶
Data Engineering is the field of designing and building systems that collect, store, process, and deliver organizational data. Data engineers build data pipelines that transform raw data into analyzable formats.
1. Role of a Data Engineer¶
1.1 Core Responsibilities¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Data Engineer Role β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 1. Data Ingestion β
β - Extract data from various sources β
β - API, databases, files, streaming β
β β
β 2. Data Storage β
β - Design Data Lake, Data Warehouse β
β - Schema design and optimization β
β β
β 3. Data Transformation β
β - Build ETL/ELT pipelines β
β - Ensure data quality β
β β
β 4. Data Serving β
β - Provide data to analysts/scientists β
β - Integrate with BI tools, API, dashboards β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1.2 Data Engineer vs Data Scientist vs Data Analyst¶
| Role | Main Responsibilities | Required Skills |
|---|---|---|
| Data Engineer | Pipeline construction, infrastructure management | Python, SQL, Spark, Airflow, Kafka |
| Data Scientist | Model development, predictive analytics | Python, ML/DL, statistics, mathematics |
| Data Analyst | Business insight extraction | SQL, BI tools, visualization, statistics |
1.3 Essential Skills for Data Engineers¶
# Example data engineer tech stack
tech_stack = {
"programming": ["Python", "SQL", "Scala", "Java"],
"databases": ["PostgreSQL", "MySQL", "MongoDB", "Redis"],
"big_data": ["Spark", "Hadoop", "Flink", "Hive"],
"orchestration": ["Airflow", "Prefect", "Dagster"],
"streaming": ["Kafka", "Kinesis", "Pub/Sub"],
"cloud": ["AWS", "GCP", "Azure"],
"infrastructure": ["Docker", "Kubernetes", "Terraform"],
"storage": ["S3", "GCS", "HDFS", "Delta Lake"]
}
2. Data Pipeline Concepts¶
2.1 What is a Pipeline?¶
A data pipeline is a series of processing steps that move data from source to destination.
ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ
β Source β β β Extract β β βTransform β β β Load β
β β β β β β β β
ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ
β β β β
Database Raw Data Cleaned Data Warehouse
API, Files Staging Processed Analytics
2.2 Pipeline Components¶
# Simple pipeline example
from datetime import datetime
import pandas as pd
class DataPipeline:
"""Basic data pipeline class"""
def __init__(self, name: str):
self.name = name
self.start_time = None
self.end_time = None
def extract(self, source: str) -> pd.DataFrame:
"""Data extraction step"""
print(f"[{datetime.now()}] Extracting from {source}")
# In practice, extract data from DB, API, files, etc.
data = pd.read_csv(source)
return data
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Data transformation step"""
print(f"[{datetime.now()}] Transforming data")
# Data cleaning, transformation, aggregation, etc.
df = df.dropna() # Remove missing values
df['processed_at'] = datetime.now()
return df
def load(self, df: pd.DataFrame, destination: str):
"""Data loading step"""
print(f"[{datetime.now()}] Loading to {destination}")
# In practice, save to DB, files, cloud storage, etc.
df.to_parquet(destination, index=False)
def run(self, source: str, destination: str):
"""Execute entire pipeline"""
self.start_time = datetime.now()
print(f"Pipeline '{self.name}' started")
# ETL process
raw_data = self.extract(source)
transformed_data = self.transform(raw_data)
self.load(transformed_data, destination)
self.end_time = datetime.now()
duration = (self.end_time - self.start_time).seconds
print(f"Pipeline completed in {duration} seconds")
# Execute pipeline
if __name__ == "__main__":
pipeline = DataPipeline("daily_sales")
pipeline.run("sales_raw.csv", "sales_processed.parquet")
2.3 Pipeline Types¶
| Type | Description | Use Cases |
|---|---|---|
| Batch | Process large volumes of data at scheduled times | Daily reports, monthly aggregations |
| Streaming | Real-time data processing | Real-time dashboards, anomaly detection |
| Micro-batch | Small batches at short intervals | Near real-time analytics (5-15 min) |
| Event-driven | Process on specific event occurrence | Trigger-based processing |
3. Batch Processing vs Stream Processing¶
3.1 Batch Processing¶
# Batch processing example: Daily sales aggregation
from datetime import datetime, timedelta
import pandas as pd
def daily_sales_batch():
"""Daily sales batch processing"""
# 1. Extract yesterday's data
yesterday = datetime.now() - timedelta(days=1)
date_str = yesterday.strftime('%Y-%m-%d')
# 2. Extract data (simulation)
query = f"""
SELECT
product_id,
SUM(quantity) as total_quantity,
SUM(amount) as total_amount
FROM sales
WHERE DATE(created_at) = '{date_str}'
GROUP BY product_id
"""
# 3. Save aggregation results
print(f"Processing batch for {date_str}")
# df = execute_query(query)
# df.to_parquet(f"sales_summary_{date_str}.parquet")
return {"status": "success", "date": date_str}
# Batch processing characteristics
batch_characteristics = {
"latency": "High (minutes to hours)",
"throughput": "High (efficient for large volumes)",
"use_cases": ["Daily reports", "Weekly aggregations", "Data migration"],
"tools": ["Spark", "Airflow", "dbt", "AWS Glue"]
}
3.2 Stream Processing¶
# Stream processing example: Real-time event processing
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, Any
import json
@dataclass
class Event:
"""Streaming event"""
event_type: str
data: dict
timestamp: datetime
class StreamProcessor:
"""Simple stream processor"""
def __init__(self):
self.handlers: dict[str, list[Callable]] = {}
def register_handler(self, event_type: str, handler: Callable):
"""Register event handler"""
if event_type not in self.handlers:
self.handlers[event_type] = []
self.handlers[event_type].append(handler)
def process(self, event: Event):
"""Process event"""
handlers = self.handlers.get(event.event_type, [])
for handler in handlers:
handler(event)
def consume(self, stream):
"""Consume events from stream (simulation)"""
for message in stream:
event = Event(
event_type=message['type'],
data=message['data'],
timestamp=datetime.now()
)
self.process(event)
# Handler examples
def log_handler(event: Event):
"""Event logging"""
print(f"[{event.timestamp}] {event.event_type}: {event.data}")
def alert_handler(event: Event):
"""Anomaly detection alert"""
if event.data.get('amount', 0) > 10000:
print(f"ALERT: High value transaction detected!")
# Streaming characteristics
streaming_characteristics = {
"latency": "Low (milliseconds to seconds)",
"throughput": "Medium (record-level)",
"use_cases": ["Real-time dashboards", "Anomaly detection", "Notifications"],
"tools": ["Kafka", "Flink", "Spark Streaming", "Kinesis"]
}
3.3 Batch vs Streaming Comparison¶
| Characteristic | Batch Processing | Stream Processing |
|---|---|---|
| Latency | Minutes to hours | Milliseconds to seconds |
| Data Throughput | Large volumes | Small/continuous |
| Complexity | Relatively simple | Relatively complex |
| Reprocessing | Easy | Difficult |
| Cost | Lower | Higher |
| Use Cases | Reports, aggregations | Real-time analytics, alerts |
4. Data Architecture Patterns¶
4.1 Traditional Data Warehouse Architecture¶
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Traditional Data Warehouse Architecture β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββ βββββββββββ βββββββββββββββββββββββββββ β
β β Source 1β β Source 2β β Source N β β
β β (ERP) β β (CRM) β β (Other) β β
β ββββββ¬βββββ ββββββ¬βββββ βββββββββββββ¬ββββββββββββββ β
β β β β β
β βββββββββββββββΌββββββββββββββββββββββ β
β β β
β βββββββββββββββββββ β
β β ETL Process β β
β β (Extract-Transform-Load) β
β ββββββββββ¬βββββββββ β
β β β
β βββββββββββββββββββ β
β β Data Warehouse β β
β β (Star Schema) β β
β ββββββββββ¬βββββββββ β
β β β
β βββββββββββββββββββ β
β β BI Tools β β
β β (Tableau, Power BI) β
β βββββββββββββββββββ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
4.2 Modern Data Lake Architecture¶
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Modern Data Lake Architecture β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Sources β
β βββββββ βββββββ βββββββ βββββββ βββββββ β
β β API β β DB β β IoT β β Log β βFilesβ β
β ββββ¬βββ ββββ¬βββ ββββ¬βββ ββββ¬βββ ββββ¬βββ β
β βββββββββ΄ββββββββ΄ββββββββ΄ββββββββ β
β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Data Lake β β
β β βββββββββββ βββββββββββ βββββββββββ β β
β β β Bronze βββ Silver βββ Gold β β β
β β β Raw β β Cleaned β βCurated β β β
β β βββββββββββ βββββββββββ βββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β βBI/Reportsβ β ML/AI β β Data Appsβ β API β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
4.3 Lambda Architecture¶
A hybrid architecture combining batch and streaming.
# Lambda architecture concept implementation
class LambdaArchitecture:
"""Lambda architecture: Batch + Streaming layers"""
def __init__(self):
self.batch_layer = BatchLayer()
self.speed_layer = SpeedLayer()
self.serving_layer = ServingLayer()
def ingest(self, data):
"""Data ingestion: Send to both layers simultaneously"""
# Batch layer (master dataset)
self.batch_layer.append(data)
# Speed layer (real-time processing)
self.speed_layer.process(data)
def query(self, params):
"""Query: Merge batch view + real-time view"""
batch_result = self.serving_layer.get_batch_view(params)
realtime_result = self.speed_layer.get_realtime_view(params)
return self.merge_views(batch_result, realtime_result)
class BatchLayer:
"""Batch layer: Process entire dataset"""
def append(self, data):
"""Append to master dataset"""
# Store immutable data (append-only)
pass
def compute_batch_views(self):
"""Compute batch views (periodic execution)"""
# Process entire data with MapReduce, Spark, etc.
pass
class SpeedLayer:
"""Speed layer: Real-time data processing"""
def process(self, data):
"""Real-time processing"""
# Stream processing (Kafka, Flink, etc.)
pass
def get_realtime_view(self, params):
"""Return real-time view"""
pass
class ServingLayer:
"""Serving layer: Query processing"""
def get_batch_view(self, params):
"""Return batch view"""
pass
4.4 Kappa Architecture¶
A simplified architecture using only streaming.
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Kappa Architecture β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Sources β
β βββββββ βββββββ βββββββ β
β βEventβ βEventβ βEventβ β
β ββββ¬βββ ββββ¬βββ ββββ¬βββ β
β βββββββββ΄ββββββββ β
β β β
β βββββββββββββββββββββββββββββββββββββββ β
β β Message Queue (Kafka) β β
β β - Event Log β β
β β - Replayable β β
β βββββββββββββββββββ¬ββββββββββββββββββββ β
β β β
β βββββββββββββββββββββββββββββββββββββββ β
β β Stream Processing Layer β β
β β (Flink, Spark Streaming) β β
β βββββββββββββββββββ¬ββββββββββββββββββββ β
β β β
β βββββββββββββββββββββββββββββββββββββββ β
β β Serving Layer β β
β β (Database, Cache, API) β β
β βββββββββββββββββββββββββββββββββββββββ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
5. Data Engineering Tool Ecosystem¶
5.1 Major Tool Categories¶
data_engineering_tools = {
"orchestration": {
"batch": ["Apache Airflow", "Prefect", "Dagster", "Luigi"],
"streaming": ["Apache Kafka", "Apache Flink", "Spark Streaming"]
},
"processing": {
"batch": ["Apache Spark", "Apache Hive", "Presto/Trino"],
"streaming": ["Apache Kafka Streams", "Apache Flink", "Apache Storm"]
},
"storage": {
"data_lake": ["S3", "GCS", "HDFS", "Azure Blob"],
"data_warehouse": ["Snowflake", "BigQuery", "Redshift", "Databricks"],
"databases": ["PostgreSQL", "MySQL", "MongoDB", "Cassandra"]
},
"transformation": {
"sql_based": ["dbt", "SQLMesh"],
"code_based": ["PySpark", "Pandas", "Polars"]
},
"quality": {
"testing": ["Great Expectations", "dbt tests", "Soda"],
"monitoring": ["Monte Carlo", "Datadog", "Grafana"]
},
"catalog": ["Apache Atlas", "DataHub", "Amundsen", "OpenMetadata"]
}
5.2 Cloud Service Mapping¶
| Function | AWS | GCP | Azure |
|---|---|---|---|
| Orchestration | Step Functions, MWAA | Cloud Composer | Data Factory |
| Streaming | Kinesis | Pub/Sub, Dataflow | Event Hubs |
| Batch Processing | EMR, Glue | Dataproc, Dataflow | HDInsight |
| Data Lake | S3 + Lake Formation | GCS + BigLake | ADLS + Synapse |
| Data Warehouse | Redshift | BigQuery | Synapse Analytics |
6. Data Engineering Best Practices¶
6.1 Pipeline Design Principles¶
# Good pipeline design principles
pipeline_best_practices = {
"idempotency": "Same input produces same result",
"atomicity": "All succeed or all fail",
"incremental": "Ensure efficiency with incremental processing",
"monitoring": "Monitor at every stage",
"error_handling": "Retry and alert on failure",
"documentation": "Manage code and documentation together"
}
# Idempotency example
def idempotent_upsert(df, table_name, key_columns):
"""Upsert function ensuring idempotency"""
# Delete existing data then insert (MERGE or DELETE + INSERT)
delete_query = f"""
DELETE FROM {table_name}
WHERE (key1, key2) IN (
SELECT DISTINCT key1, key2 FROM staging_table
)
"""
# execute(delete_query)
# insert_dataframe(df, table_name)
pass
6.2 Error Handling and Retry¶
import time
from functools import wraps
from typing import Callable, Type
def retry(
max_attempts: int = 3,
delay: float = 1.0,
exceptions: tuple[Type[Exception], ...] = (Exception,)
):
"""Retry decorator"""
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_attempts:
print(f"Attempt {attempt} failed: {e}")
time.sleep(delay * attempt) # Exponential backoff
raise last_exception
return wrapper
return decorator
@retry(max_attempts=3, delay=2.0)
def fetch_data_from_api(url: str):
"""Fetch data from API (with retry)"""
import requests
response = requests.get(url, timeout=30)
response.raise_for_status()
return response.json()
Practice Problems¶
Problem 1: Pipeline Design¶
Design a pipeline that generates daily sales reports for an online shopping mall.
# Example solution
class DailySalesReportPipeline:
def extract(self):
"""Extract order, product, customer data"""
pass
def transform(self):
"""Sales aggregation, category analysis"""
pass
def load(self):
"""Load report table"""
pass
Problem 2: Batch vs Streaming Selection¶
Choose the appropriate approach (batch or streaming) for the following cases and explain why: - Daily sales report generation - Real-time low stock alerts - Monthly customer segmentation
Summary¶
| Concept | Description |
|---|---|
| Data Pipeline | Moving and transforming data from source to destination |
| Batch Processing | Periodically processing large volumes of data |
| Stream Processing | Processing data in real-time |
| Data Lake | Storage for raw data |
| Data Warehouse | Analytics storage for cleaned data |
| ETL/ELT | Extract, transform, load data process |