ETL vs ELT

ETL vs ELT

๊ฐœ์š”

ETL(Extract, Transform, Load)๊ณผ ELT(Extract, Load, Transform)๋Š” ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ์˜ ๋‘ ๊ฐ€์ง€ ์ฃผ์š” ํŒจํ„ด์ž…๋‹ˆ๋‹ค. ์ „ํ†ต์ ์ธ ETL์€ ๋ณ€ํ™˜ ํ›„ ์ ์žฌํ•˜๊ณ , ๋ชจ๋˜ ELT๋Š” ์ ์žฌ ํ›„ ๋ณ€ํ™˜ํ•ฉ๋‹ˆ๋‹ค.


1. ETL (Extract, Transform, Load)

1.1 ETL ํ”„๋กœ์„ธ์Šค

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                      ETL Process                            โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                             โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”         โ”‚
โ”‚   โ”‚ Sources  โ”‚ โ†’ โ”‚ ETL Server   โ”‚ โ†’ โ”‚ Target   โ”‚          โ”‚
โ”‚   โ”‚          โ”‚    โ”‚              โ”‚    โ”‚ (DW)     โ”‚          โ”‚
โ”‚   โ”‚ - DB     โ”‚    โ”‚ 1. Extract   โ”‚    โ”‚          โ”‚          โ”‚
โ”‚   โ”‚ - Files  โ”‚    โ”‚ 2. Transform โ”‚    โ”‚ Clean    โ”‚          โ”‚
โ”‚   โ”‚ - APIs   โ”‚    โ”‚ 3. Load      โ”‚    โ”‚ Data     โ”‚          โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜         โ”‚
โ”‚                                                             โ”‚
โ”‚   ๋ณ€ํ™˜์ด ์ค‘๊ฐ„ ์„œ๋ฒ„์—์„œ ์ˆ˜ํ–‰๋จ                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

1.2 ETL ์˜ˆ์‹œ ์ฝ”๋“œ

import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime

class ETLPipeline:
    """์ „ํ†ต์ ์ธ ETL ํŒŒ์ดํ”„๋ผ์ธ"""

    def __init__(self, source_conn: str, target_conn: str):
        self.source_engine = create_engine(source_conn)
        self.target_engine = create_engine(target_conn)

    def extract(self, query: str) -> pd.DataFrame:
        """
        Extract: ์†Œ์Šค์—์„œ ๋ฐ์ดํ„ฐ ์ถ”์ถœ
        """
        print(f"[Extract] Starting at {datetime.now()}")
        df = pd.read_sql(query, self.source_engine)
        print(f"[Extract] Extracted {len(df)} rows")
        return df

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Transform: ๋ฐ์ดํ„ฐ ์ •์ œ ๋ฐ ๋ณ€ํ™˜
        - ์ด ๋‹จ๊ณ„๊ฐ€ ETL ์„œ๋ฒ„์—์„œ ์ˆ˜ํ–‰๋จ (๋ฆฌ์†Œ์Šค ์†Œ๋ชจ)
        """
        print(f"[Transform] Starting at {datetime.now()}")

        # 1. ๊ฒฐ์ธก์น˜ ์ฒ˜๋ฆฌ
        df = df.dropna(subset=['customer_id', 'amount'])
        df['email'] = df['email'].fillna('unknown@example.com')

        # 2. ๋ฐ์ดํ„ฐ ํƒ€์ž… ๋ณ€ํ™˜
        df['order_date'] = pd.to_datetime(df['order_date'])
        df['amount'] = df['amount'].astype(float)

        # 3. ํŒŒ์ƒ ์ปฌ๋Ÿผ ์ƒ์„ฑ
        df['year'] = df['order_date'].dt.year
        df['month'] = df['order_date'].dt.month
        df['day_of_week'] = df['order_date'].dt.dayofweek

        # 4. ๋น„์ฆˆ๋‹ˆ์Šค ๋กœ์ง ์ ์šฉ
        df['customer_segment'] = df['total_purchases'].apply(
            lambda x: 'Gold' if x > 10000 else ('Silver' if x > 5000 else 'Bronze')
        )

        # 5. ๋ฐ์ดํ„ฐ ํ’ˆ์งˆ ๊ฒ€์ฆ
        assert df['amount'].min() >= 0, "Negative amounts found"

        print(f"[Transform] Transformed {len(df)} rows")
        return df

    def load(self, df: pd.DataFrame, table_name: str):
        """
        Load: ํƒ€๊ฒŸ ๋ฐ์ดํ„ฐ ์›จ์–ดํ•˜์šฐ์Šค์— ์ ์žฌ
        """
        print(f"[Load] Starting at {datetime.now()}")

        # Full refresh (ํ…Œ์ด๋ธ” ๊ต์ฒด)
        df.to_sql(
            table_name,
            self.target_engine,
            if_exists='replace',
            index=False,
            chunksize=10000
        )

        print(f"[Load] Loaded {len(df)} rows to {table_name}")

    def run(self, source_query: str, target_table: str):
        """ETL ํŒŒ์ดํ”„๋ผ์ธ ์‹คํ–‰"""
        start_time = datetime.now()
        print(f"ETL Pipeline started at {start_time}")

        # E-T-L ์ˆœ์„œ๋กœ ์‹คํ–‰
        raw_data = self.extract(source_query)
        transformed_data = self.transform(raw_data)
        self.load(transformed_data, target_table)

        end_time = datetime.now()
        print(f"ETL Pipeline completed in {(end_time - start_time).seconds} seconds")


# ์‚ฌ์šฉ ์˜ˆ์‹œ
if __name__ == "__main__":
    pipeline = ETLPipeline(
        source_conn="postgresql://user:pass@source-db:5432/sales",
        target_conn="postgresql://user:pass@warehouse:5432/analytics"
    )

    pipeline.run(
        source_query="""
            SELECT
                o.order_id,
                o.customer_id,
                c.email,
                o.order_date,
                o.amount,
                c.total_purchases
            FROM orders o
            JOIN customers c ON o.customer_id = c.customer_id
            WHERE o.order_date >= CURRENT_DATE - INTERVAL '1 day'
        """,
        target_table="fact_daily_orders"
    )

1.3 ETL ๋„๊ตฌ

๋„๊ตฌ ์œ ํ˜• ํŠน์ง•
Informatica ์ƒ์šฉ ์—”ํ„ฐํ”„๋ผ์ด์ฆˆ๊ธ‰, GUI ๊ธฐ๋ฐ˜
Talend ์˜คํ”ˆ์†Œ์Šค/์ƒ์šฉ Java ๊ธฐ๋ฐ˜, ๋‹ค์–‘ํ•œ ์ปค๋„ฅํ„ฐ
SSIS ์ƒ์šฉ (MS) SQL Server ํ†ตํ•ฉ
Pentaho ์˜คํ”ˆ์†Œ์Šค ๊ฒฝ๋Ÿ‰, ์‚ฌ์šฉ ํŽธ์˜
Apache NiFi ์˜คํ”ˆ์†Œ์Šค ๋ฐ์ดํ„ฐ ํ”Œ๋กœ์šฐ, ์‹ค์‹œ๊ฐ„

2. ELT (Extract, Load, Transform)

2.1 ELT ํ”„๋กœ์„ธ์Šค

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                      ELT Process                            โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                             โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”‚
โ”‚   โ”‚ Sources  โ”‚ โ†’ โ”‚ Load Raw     โ”‚ โ†’ โ”‚ Transform    โ”‚      โ”‚
โ”‚   โ”‚          โ”‚    โ”‚ (Data Lake)  โ”‚    โ”‚ (in DW)      โ”‚      โ”‚
โ”‚   โ”‚ - DB     โ”‚    โ”‚              โ”‚    โ”‚              โ”‚      โ”‚
โ”‚   โ”‚ - Files  โ”‚    โ”‚ Raw Zone     โ”‚    โ”‚ SQL/Spark    โ”‚      โ”‚
โ”‚   โ”‚ - APIs   โ”‚    โ”‚ (as-is)      โ”‚    โ”‚ (DW ๋ฆฌ์†Œ์Šค)  โ”‚      โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ”‚
โ”‚                                                             โ”‚
โ”‚   ๋ณ€ํ™˜์ด ํƒ€๊ฒŸ ์‹œ์Šคํ…œ(DW/Lake)์—์„œ ์ˆ˜ํ–‰๋จ                       โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

2.2 ELT ์˜ˆ์‹œ ์ฝ”๋“œ

import pandas as pd
from datetime import datetime

class ELTPipeline:
    """๋ชจ๋˜ ELT ํŒŒ์ดํ”„๋ผ์ธ"""

    def __init__(self, source_conn: str, warehouse_conn: str):
        self.source_conn = source_conn
        self.warehouse_conn = warehouse_conn

    def extract_and_load(self, source_query: str, raw_table: str):
        """
        Extract & Load: ์›๋ณธ ๋ฐ์ดํ„ฐ๋ฅผ ๊ทธ๋Œ€๋กœ ์ ์žฌ
        - ๋ณ€ํ™˜ ์—†์ด raw ๋ฐ์ดํ„ฐ๋ฅผ ๋น ๋ฅด๊ฒŒ ์ ์žฌ
        """
        print(f"[Extract & Load] Starting at {datetime.now()}")

        # ์†Œ์Šค์—์„œ ๋ฐ์ดํ„ฐ ์ถ”์ถœ
        df = pd.read_sql(source_query, self.source_conn)

        # Raw ํ…Œ์ด๋ธ”์— ๊ทธ๋Œ€๋กœ ์ ์žฌ (๋ณ€ํ™˜ ์—†์Œ)
        df.to_sql(
            raw_table,
            self.warehouse_conn,
            if_exists='replace',
            index=False
        )

        print(f"[Extract & Load] Loaded {len(df)} rows to {raw_table}")

    def transform_in_warehouse(self, transform_sql: str):
        """
        Transform: ์›จ์–ดํ•˜์šฐ์Šค ๋‚ด์—์„œ SQL๋กœ ๋ณ€ํ™˜
        - DW์˜ ์ปดํ“จํŒ… ํŒŒ์›Œ ํ™œ์šฉ
        - SQL ๊ธฐ๋ฐ˜ ๋ณ€ํ™˜ (dbt ๋“ฑ ์‚ฌ์šฉ)
        """
        print(f"[Transform] Starting at {datetime.now()}")

        # ์›จ์–ดํ•˜์šฐ์Šค์—์„œ SQL ์‹คํ–‰
        with self.warehouse_conn.connect() as conn:
            conn.execute(transform_sql)

        print(f"[Transform] Transformation completed")


# dbt ๋ชจ๋ธ ์˜ˆ์‹œ (SQL ๊ธฐ๋ฐ˜ ๋ณ€ํ™˜)
DBT_MODEL_EXAMPLE = """
-- models/staging/stg_orders.sql
-- dbt๋ฅผ ์‚ฌ์šฉํ•œ ELT ๋ณ€ํ™˜

WITH source AS (
    SELECT * FROM {{ source('raw', 'orders_raw') }}
),

cleaned AS (
    SELECT
        order_id,
        customer_id,
        COALESCE(email, 'unknown@example.com') AS email,
        CAST(order_date AS DATE) AS order_date,
        CAST(amount AS DECIMAL(10, 2)) AS amount,
        total_purchases,
        -- ํŒŒ์ƒ ์ปฌ๋Ÿผ
        EXTRACT(YEAR FROM order_date) AS order_year,
        EXTRACT(MONTH FROM order_date) AS order_month,
        EXTRACT(DOW FROM order_date) AS day_of_week,
        -- ๋น„์ฆˆ๋‹ˆ์Šค ๋กœ์ง
        CASE
            WHEN total_purchases > 10000 THEN 'Gold'
            WHEN total_purchases > 5000 THEN 'Silver'
            ELSE 'Bronze'
        END AS customer_segment,
        -- ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ
        CURRENT_TIMESTAMP AS loaded_at
    FROM source
    WHERE customer_id IS NOT NULL
      AND amount IS NOT NULL
      AND amount >= 0
)

SELECT * FROM cleaned
"""


# ์‹ค์ œ ELT ํŒŒ์ดํ”„๋ผ์ธ (Snowflake/BigQuery ์Šคํƒ€์ผ)
class ModernELTWithSQL:
    """SQL ๊ธฐ๋ฐ˜ ๋ชจ๋˜ ELT"""

    def __init__(self, warehouse):
        self.warehouse = warehouse

    def extract_load(self, source: str, target_raw: str):
        """์›๋ณธ โ†’ Raw ๋ ˆ์ด์–ด"""
        copy_sql = f"""
        COPY INTO {target_raw}
        FROM @{source}
        FILE_FORMAT = (TYPE = 'PARQUET')
        """
        self.warehouse.execute(copy_sql)

    def transform_staging(self):
        """Raw โ†’ Staging ๋ ˆ์ด์–ด"""
        staging_sql = """
        CREATE OR REPLACE TABLE staging.orders AS
        SELECT
            order_id,
            customer_id,
            PARSE_JSON(raw_data):email::STRING AS email,
            TO_DATE(raw_data:order_date) AS order_date,
            raw_data:amount::NUMBER(10,2) AS amount
        FROM raw.orders_raw
        """
        self.warehouse.execute(staging_sql)

    def transform_mart(self):
        """Staging โ†’ Mart ๋ ˆ์ด์–ด"""
        mart_sql = """
        CREATE OR REPLACE TABLE mart.fact_orders AS
        SELECT
            o.order_id,
            d.date_sk,
            c.customer_sk,
            o.amount,
            -- ์ง‘๊ณ„
            SUM(o.amount) OVER (
                PARTITION BY o.customer_id
                ORDER BY o.order_date
            ) AS cumulative_amount
        FROM staging.orders o
        JOIN dim_date d ON o.order_date = d.full_date
        JOIN dim_customer c ON o.customer_id = c.customer_id
        """
        self.warehouse.execute(mart_sql)

2.3 ELT ๋„๊ตฌ

๋„๊ตฌ ์œ ํ˜• ํŠน์ง•
dbt ์˜คํ”ˆ์†Œ์Šค SQL ๊ธฐ๋ฐ˜ ๋ณ€ํ™˜, ํ…Œ์ŠคํŠธ, ๋ฌธ์„œํ™”
Fivetran ์ƒ์šฉ ์ž๋™ ์Šคํ‚ค๋งˆ ๊ด€๋ฆฌ, 150+ ์ปค๋„ฅํ„ฐ
Airbyte ์˜คํ”ˆ์†Œ์Šค ์ปค์Šคํ…€ ์ปค๋„ฅํ„ฐ, EL ํŠนํ™”
Stitch ์ƒ์šฉ ๊ฐ„ํŽธํ•œ ์„ค์ •, SaaS ์นœํ™”์ 
AWS Glue ํด๋ผ์šฐ๋“œ ์„œ๋ฒ„๋ฆฌ์Šค, Spark ๊ธฐ๋ฐ˜

3. ETL vs ELT ๋น„๊ต

3.1 ์ƒ์„ธ ๋น„๊ต

ํŠน์„ฑ ETL ELT
๋ณ€ํ™˜ ์œ„์น˜ ์ค‘๊ฐ„ ์„œ๋ฒ„ ํƒ€๊ฒŸ ์‹œ์Šคํ…œ (DW/Lake)
๋ฐ์ดํ„ฐ ์ด๋™ ๋ณ€ํ™˜๋œ ๋ฐ์ดํ„ฐ๋งŒ ์›๋ณธ ๋ฐ์ดํ„ฐ ์ „์ฒด
์Šคํ‚ค๋งˆ ๋ฏธ๋ฆฌ ์ •์˜ ํ•„์š” ์œ ์—ฐ (Schema-on-Read)
์ฒ˜๋ฆฌ ์†๋„ ๋А๋ฆผ (์ค‘๊ฐ„ ์ฒ˜๋ฆฌ) ๋น ๋ฆ„ (๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ)
๋น„์šฉ ๋ณ„๋„ ์ธํ”„๋ผ ํ•„์š” DW ๋ฆฌ์†Œ์Šค ์‚ฌ์šฉ
์œ ์—ฐ์„ฑ ๋‚ฎ์Œ ๋†’์Œ (์›๋ณธ ๋ณด์กด)
๋ณต์žกํ•œ ๋ณ€ํ™˜ ์ ํ•ฉ ์ œํ•œ์ 
์‹ค์‹œ๊ฐ„ ์ฒ˜๋ฆฌ ์–ด๋ ค์›€ ๋น„๊ต์  ์šฉ์ด

3.2 ์„ ํƒ ๊ธฐ์ค€

def choose_etl_or_elt(requirements: dict) -> str:
    """ETL/ELT ์„ ํƒ ๊ฐ€์ด๋“œ"""

    # ETL ์„ ํ˜ธ ์ƒํ™ฉ
    etl_factors = [
        requirements.get('data_privacy', False),      # ๋ฏผ๊ฐ ๋ฐ์ดํ„ฐ ๋งˆ์Šคํ‚น ํ•„์š”
        requirements.get('complex_transforms', False), # ๋ณต์žกํ•œ ๋น„์ฆˆ๋‹ˆ์Šค ๋กœ์ง
        requirements.get('legacy_systems', False),    # ๋ ˆ๊ฑฐ์‹œ ์‹œ์Šคํ…œ ์—ฐ๋™
        requirements.get('small_data', False),        # ์†Œ๊ทœ๋ชจ ๋ฐ์ดํ„ฐ
    ]

    # ELT ์„ ํ˜ธ ์ƒํ™ฉ
    elt_factors = [
        requirements.get('big_data', False),          # ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ
        requirements.get('cloud_dw', False),          # ํด๋ผ์šฐ๋“œ DW ์‚ฌ์šฉ
        requirements.get('data_lake', False),         # ๋ฐ์ดํ„ฐ ๋ ˆ์ดํฌ ๊ตฌ์ถ•
        requirements.get('flexible_schema', False),   # ์Šคํ‚ค๋งˆ ์œ ์—ฐ์„ฑ ํ•„์š”
        requirements.get('raw_data_access', False),   # ์›๋ณธ ๋ฐ์ดํ„ฐ ์ ‘๊ทผ ํ•„์š”
        requirements.get('sql_transforms', False),    # SQL๋กœ ๋ณ€ํ™˜ ๊ฐ€๋Šฅ
    ]

    etl_score = sum(etl_factors)
    elt_score = sum(elt_factors)

    if etl_score > elt_score:
        return "ETL ๊ถŒ์žฅ"
    elif elt_score > etl_score:
        return "ELT ๊ถŒ์žฅ"
    else:
        return "ํ•˜์ด๋ธŒ๋ฆฌ๋“œ ๊ณ ๋ ค"


# ์‚ฌ์šฉ ์˜ˆ์‹œ
project_requirements = {
    'big_data': True,
    'cloud_dw': True,  # Snowflake, BigQuery
    'sql_transforms': True,
    'raw_data_access': True
}

recommendation = choose_etl_or_elt(project_requirements)
print(recommendation)  # "ELT ๊ถŒ์žฅ"

4. ํ•˜์ด๋ธŒ๋ฆฌ๋“œ ์ ‘๊ทผ๋ฒ•

4.1 ETLT ํŒจํ„ด

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    ETLT (Hybrid) Pattern                    โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                             โ”‚
โ”‚   Sources โ†’ [E] โ†’ [T] โ†’ [L] โ†’ [T] โ†’ Mart                   โ”‚
โ”‚                    โ†‘          โ†‘                             โ”‚
โ”‚                    โ”‚          โ”‚                             โ”‚
โ”‚            Light Transform   Heavy Transform                โ”‚
โ”‚            (๋งˆ์Šคํ‚น, ๊ฒ€์ฆ)    (์ง‘๊ณ„, ์กฐ์ธ)                    โ”‚
โ”‚            ETL Server        Data Warehouse                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

4.2 ํ•˜์ด๋ธŒ๋ฆฌ๋“œ ๊ตฌํ˜„ ์˜ˆ์‹œ

class HybridPipeline:
    """ETL + ELT ํ•˜์ด๋ธŒ๋ฆฌ๋“œ ํŒŒ์ดํ”„๋ผ์ธ"""

    def __init__(self, source, staging_area, warehouse):
        self.source = source
        self.staging = staging_area
        self.warehouse = warehouse

    def extract_with_light_transform(self):
        """
        E + Light T: ์ถ”์ถœํ•˜๋ฉด์„œ ๊ฐ€๋ฒผ์šด ๋ณ€ํ™˜ ์ˆ˜ํ–‰
        - PII ๋งˆ์Šคํ‚น (๊ฐœ์ธ์ •๋ณด ๋ณดํ˜ธ)
        - ๊ธฐ๋ณธ ๋ฐ์ดํ„ฐ ํƒ€์ž… ๋ณ€ํ™˜
        - ํ•„์ˆ˜ ํ•„๋“œ ๊ฒ€์ฆ
        """
        query = """
        SELECT
            order_id,
            -- PII ๋งˆ์Šคํ‚น (์†Œ์Šค์—์„œ ์ˆ˜ํ–‰)
            MD5(customer_email) AS customer_email_hash,
            SUBSTRING(phone, 1, 3) || '****' || SUBSTRING(phone, -4) AS phone_masked,
            -- ๊ธฐ๋ณธ ๋ณ€ํ™˜
            CAST(order_date AS DATE) AS order_date,
            CAST(amount AS DECIMAL(10, 2)) AS amount
        FROM orders
        WHERE amount IS NOT NULL
        """
        return self.source.execute(query)

    def load_to_staging(self, data):
        """L: ์Šคํ…Œ์ด์ง• ์˜์—ญ์— ์ ์žฌ"""
        self.staging.load(data, 'orders_staging')

    def transform_in_warehouse(self):
        """
        Heavy T: ์›จ์–ดํ•˜์šฐ์Šค์—์„œ ๋ณต์žกํ•œ ๋ณ€ํ™˜
        - ์กฐ์ธ
        - ์ง‘๊ณ„
        - ์œˆ๋„์šฐ ํ•จ์ˆ˜
        """
        heavy_transform_sql = """
        CREATE TABLE mart.order_analysis AS
        SELECT
            o.order_date,
            c.customer_segment,
            p.product_category,
            COUNT(*) AS order_count,
            SUM(o.amount) AS total_amount,
            AVG(o.amount) AS avg_order_value,
            -- ์œˆ๋„์šฐ ํ•จ์ˆ˜ (DW์—์„œ ํšจ์œจ์ )
            SUM(o.amount) OVER (
                PARTITION BY c.customer_segment
                ORDER BY o.order_date
                ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
            ) AS rolling_7day_amount
        FROM orders_staging o
        JOIN dim_customer c ON o.customer_id = c.customer_id
        JOIN dim_product p ON o.product_id = p.product_id
        GROUP BY o.order_date, c.customer_segment, p.product_category
        """
        self.warehouse.execute(heavy_transform_sql)

    def run(self):
        """ํ•˜์ด๋ธŒ๋ฆฌ๋“œ ํŒŒ์ดํ”„๋ผ์ธ ์‹คํ–‰"""
        # Phase 1: ETL (Extract + Light Transform)
        data = self.extract_with_light_transform()

        # Phase 2: Load to staging
        self.load_to_staging(data)

        # Phase 3: ELT (Heavy Transform in DW)
        self.transform_in_warehouse()

5. ์‹ค๋ฌด ์‚ฌ๋ก€

5.1 ETL ์‚ฌ์šฉ ์‚ฌ๋ก€

# ์‚ฌ๋ก€ 1: ๊ฐœ์ธ์ •๋ณด ์ฒ˜๋ฆฌ (GDPR ์ค€์ˆ˜)
class GDPRCompliantETL:
    """GDPR ์ค€์ˆ˜ ETL - ๊ฐœ์ธ์ •๋ณด ๋งˆ์Šคํ‚น ํ›„ ์ ์žฌ"""

    def transform(self, df):
        # ๋ฏผ๊ฐ ์ •๋ณด ๋งˆ์Šคํ‚น (์ ์žฌ ์ „ ์ˆ˜ํ–‰)
        df['email'] = df['email'].apply(self.mask_email)
        df['ssn'] = df['ssn'].apply(lambda x: 'XXX-XX-' + x[-4:])
        df['credit_card'] = df['credit_card'].apply(lambda x: '**** **** **** ' + x[-4:])

        # EU ์™ธ ์ง€์—ญ์œผ๋กœ ๋ฐ์ดํ„ฐ ์ „์†ก ์ „ ๋ณ€ํ™˜
        df = df[df['consent_given'] == True]

        return df

    def mask_email(self, email):
        if pd.isna(email):
            return None
        local, domain = email.split('@')
        return local[:2] + '***@' + domain


# ์‚ฌ๋ก€ 2: ๋ ˆ๊ฑฐ์‹œ ์‹œ์Šคํ…œ ํ†ตํ•ฉ
class LegacySystemETL:
    """๋ ˆ๊ฑฐ์‹œ ๋ฉ”์ธํ”„๋ ˆ์ž„ ๋ฐ์ดํ„ฐ ํ†ตํ•ฉ"""

    def transform(self, raw_data):
        # ๊ณ ์ • ๊ธธ์ด ๋ ˆ์ฝ”๋“œ ํŒŒ์‹ฑ
        records = []
        for line in raw_data.split('\n'):
            record = {
                'account_no': line[0:10].strip(),
                'account_type': line[10:12],
                'balance': int(line[12:24]) / 100,  # ์†Œ์ˆ˜์  ๋ณ€ํ™˜
                'status': 'A' if line[24:25] == '1' else 'I',
                'date': self.parse_legacy_date(line[25:33])
            }
            records.append(record)
        return pd.DataFrame(records)

    def parse_legacy_date(self, date_str):
        # YYYYMMDD โ†’ YYYY-MM-DD
        return f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:8]}"

5.2 ELT ์‚ฌ์šฉ ์‚ฌ๋ก€

-- ์‚ฌ๋ก€ 1: dbt๋ฅผ ํ™œ์šฉํ•œ ์ด์ปค๋จธ์Šค ๋ถ„์„

-- models/staging/stg_orders.sql
WITH raw_orders AS (
    SELECT * FROM {{ source('raw', 'orders') }}
)
SELECT
    order_id,
    customer_id,
    order_date,
    status,
    total_amount
FROM raw_orders
WHERE order_date IS NOT NULL

-- models/marts/core/fct_orders.sql
WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),
customers AS (
    SELECT * FROM {{ ref('dim_customers') }}
),
products AS (
    SELECT * FROM {{ ref('dim_products') }}
)
SELECT
    o.order_id,
    o.order_date,
    c.customer_segment,
    p.product_category,
    o.total_amount,
    -- DW์—์„œ ์œˆ๋„์šฐ ํ•จ์ˆ˜ ํ™œ์šฉ
    ROW_NUMBER() OVER (PARTITION BY o.customer_id ORDER BY o.order_date) AS order_sequence,
    LAG(o.order_date) OVER (PARTITION BY o.customer_id ORDER BY o.order_date) AS prev_order_date
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN products p ON o.product_id = p.product_id


-- ์‚ฌ๋ก€ 2: BigQuery ELT
-- ๋Œ€์šฉ๋Ÿ‰ ๋กœ๊ทธ ๋ถ„์„ (์„œ๋ฒ„๋ฆฌ์Šค ์ฒ˜๋ฆฌ)
CREATE OR REPLACE TABLE analytics.user_behavior AS
SELECT
    user_id,
    DATE(timestamp) AS event_date,
    event_type,
    COUNT(*) AS event_count,
    COUNTIF(event_type = 'purchase') AS purchase_count,
    SUM(CASE WHEN event_type = 'purchase' THEN revenue ELSE 0 END) AS total_revenue,
    -- ์„ธ์…˜ ๋ถ„์„ (๋ณต์žกํ•œ ์œˆ๋„์šฐ ํ•จ์ˆ˜)
    ARRAY_AGG(
        STRUCT(timestamp, event_type, page_url)
        ORDER BY timestamp
    ) AS event_sequence
FROM raw.events
WHERE DATE(timestamp) = CURRENT_DATE() - 1
GROUP BY user_id, DATE(timestamp), event_type;

6. ๋„๊ตฌ ์„ ํƒ ๊ฐ€์ด๋“œ

6.1 ๋ฐ์ดํ„ฐ ๊ทœ๋ชจ๋ณ„ ๊ถŒ์žฅ ๋„๊ตฌ

๋ฐ์ดํ„ฐ ๊ทœ๋ชจ ETL ๋„๊ตฌ ELT ๋„๊ตฌ
์†Œ๊ทœ๋ชจ (< 1GB) Python + Pandas dbt + PostgreSQL
์ค‘๊ทœ๋ชจ (1GB-100GB) Airflow + Python dbt + Snowflake
๋Œ€๊ทœ๋ชจ (> 100GB) Spark dbt + BigQuery/Databricks

6.2 ์•„ํ‚คํ…์ฒ˜๋ณ„ ๊ถŒ์žฅ

architecture_recommendations = {
    "traditional_dw": {
        "approach": "ETL",
        "tools": ["Informatica", "Talend", "SSIS"],
        "reason": "์Šคํ‚ค๋งˆ ์—„๊ฒฉ, ๋ณ€ํ™˜ ํ›„ ์ ์žฌ"
    },
    "cloud_dw": {
        "approach": "ELT",
        "tools": ["dbt", "Fivetran + dbt", "Airbyte + dbt"],
        "reason": "DW ์ปดํ“จํŒ… ํŒŒ์›Œ ํ™œ์šฉ, ์›๋ณธ ๋ณด์กด"
    },
    "data_lake": {
        "approach": "ELT",
        "tools": ["Spark", "AWS Glue", "Databricks"],
        "reason": "์Šคํ‚ค๋งˆ ์œ ์—ฐ, ๋Œ€์šฉ๋Ÿ‰ ์ฒ˜๋ฆฌ"
    },
    "hybrid": {
        "approach": "ETLT",
        "tools": ["Airflow + dbt", "Prefect + dbt"],
        "reason": "๋ฏผ๊ฐ ์ •๋ณด ์ฒ˜๋ฆฌ + DW ๋ณ€ํ™˜"
    }
}

์—ฐ์Šต ๋ฌธ์ œ

๋ฌธ์ œ 1: ETL vs ELT ์„ ํƒ

๋‹ค์Œ ์ƒํ™ฉ์—์„œ ETL๊ณผ ELT ์ค‘ ์–ด๋–ค ๋ฐฉ์‹์ด ์ ํ•ฉํ•œ์ง€ ์„ ํƒํ•˜๊ณ  ์ด์œ ๋ฅผ ์„ค๋ช…ํ•˜์„ธ์š”: - ์ผ์ผ 100GB์˜ ๋กœ๊ทธ ๋ฐ์ดํ„ฐ๋ฅผ BigQuery์— ์ ์žฌ - ๊ฐœ์ธ์ •๋ณด๊ฐ€ ํฌํ•จ๋œ ๊ณ ๊ฐ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌ

๋ฌธ์ œ 2: ELT SQL ์ž‘์„ฑ

Raw ํ…Œ์ด๋ธ” raw_orders์—์„œ ์ผ๋ณ„ ๋งค์ถœ ์ง‘๊ณ„ ํ…Œ์ด๋ธ”์„ ์ƒ์„ฑํ•˜๋Š” ELT SQL์„ ์ž‘์„ฑํ•˜์„ธ์š”.


์š”์•ฝ

๊ฐœ๋… ์„ค๋ช…
ETL ๋ณ€ํ™˜ ํ›„ ์ ์žฌ, ์ค‘๊ฐ„ ์„œ๋ฒ„์—์„œ ์ฒ˜๋ฆฌ
ELT ์ ์žฌ ํ›„ ๋ณ€ํ™˜, ํƒ€๊ฒŸ ์‹œ์Šคํ…œ์—์„œ ์ฒ˜๋ฆฌ
ETL ์žฅ์  ๋ฐ์ดํ„ฐ ํ’ˆ์งˆ ๋ณด์žฅ, ๋ฏผ๊ฐ ์ •๋ณด ์ฒ˜๋ฆฌ
ELT ์žฅ์  ๋น ๋ฅธ ์ ์žฌ, ์œ ์—ฐํ•œ ์Šคํ‚ค๋งˆ, ์›๋ณธ ๋ณด์กด
ํ•˜์ด๋ธŒ๋ฆฌ๋“œ ETL + ELT ์กฐํ•ฉ, ์ƒํ™ฉ์— ๋งž๊ฒŒ ์„ ํƒ

์ฐธ๊ณ  ์ž๋ฃŒ

to navigate between lessons