dbt λ³€ν™˜ 도ꡬ

dbt λ³€ν™˜ 도ꡬ

κ°œμš”

dbt(data build tool)λŠ” SQL 기반의 데이터 λ³€ν™˜ λ„κ΅¬μž…λ‹ˆλ‹€. ELT νŒ¨ν„΄μ—μ„œ Transform 단계λ₯Ό λ‹΄λ‹Ήν•˜λ©°, μ†Œν”„νŠΈμ›¨μ–΄ μ—”μ§€λ‹ˆμ–΄λ§ λͺ¨λ²” 사둀(버전 관리, ν…ŒμŠ€νŠΈ, λ¬Έμ„œν™”)λ₯Ό 데이터 λ³€ν™˜μ— μ μš©ν•©λ‹ˆλ‹€.


1. dbt κ°œμš”

1.1 dbtλž€?

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                        dbt μ—­ν•                                  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                β”‚
β”‚   ELT νŒŒμ΄ν”„λΌμΈμ—μ„œ T(Transform) λ‹΄λ‹Ή                          β”‚
β”‚                                                                β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”               β”‚
β”‚   β”‚ Extract  β”‚ β†’  β”‚   Load   β”‚ β†’  β”‚Transform β”‚               β”‚
β”‚   β”‚ (Fivetranβ”‚    β”‚  (to DW) β”‚    β”‚  (dbt)   β”‚               β”‚
β”‚   β”‚  Airbyte)β”‚    β”‚          β”‚    β”‚          β”‚               β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜               β”‚
β”‚                                                                β”‚
β”‚   dbt 핡심 κΈ°λŠ₯:                                               β”‚
β”‚   - SQL 기반 λͺ¨λΈ μ •μ˜                                         β”‚
β”‚   - μ˜μ‘΄μ„± μžλ™ 관리                                           β”‚
β”‚   - ν…ŒμŠ€νŠΈ 및 λ¬Έμ„œν™”                                           β”‚
β”‚   - Jinja ν…œν”Œλ¦Ώ 지원                                          β”‚
β”‚   - 버전 관리 (Git)                                            β”‚
β”‚                                                                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1.2 dbt Core vs dbt Cloud

νŠΉμ„± dbt Core dbt Cloud
λΉ„μš© 무료 (μ˜€ν”ˆμ†ŒμŠ€) 유료 (SaaS)
μ‹€ν–‰ CLI Web UI + API
μŠ€μΌ€μ€„λ§ μ™ΈλΆ€ 도ꡬ ν•„μš” (Airflow) λ‚΄μž₯ μŠ€μΌ€μ€„λŸ¬
IDE VS Code λ“± λ‚΄μž₯ IDE
ν˜‘μ—… Git μ‚¬μš© λ‚΄μž₯ ν˜‘μ—… κΈ°λŠ₯

1.3 μ„€μΉ˜

# dbt Core μ„€μΉ˜
pip install dbt-core

# λ°μ΄ν„°λ² μ΄μŠ€λ³„ μ–΄λŒ‘ν„° μ„€μΉ˜
pip install dbt-postgres      # PostgreSQL
pip install dbt-snowflake     # Snowflake
pip install dbt-bigquery      # BigQuery
pip install dbt-redshift      # Redshift
pip install dbt-databricks    # Databricks

# 버전 확인
dbt --version

2. ν”„λ‘œμ νŠΈ ꡬ쑰

2.1 ν”„λ‘œμ νŠΈ μ΄ˆκΈ°ν™”

# μƒˆ ν”„λ‘œμ νŠΈ 생성
dbt init my_project
cd my_project

# ν”„λ‘œμ νŠΈ ꡬ쑰
my_project/
β”œβ”€β”€ dbt_project.yml          # ν”„λ‘œμ νŠΈ μ„€μ •
β”œβ”€β”€ profiles.yml             # μ—°κ²° μ„€μ • (~/.dbt/profiles.yml)
β”œβ”€β”€ models/                  # SQL λͺ¨λΈ
β”‚   β”œβ”€β”€ staging/            # μŠ€ν…Œμ΄μ§• λͺ¨λΈ
β”‚   β”œβ”€β”€ intermediate/       # 쀑간 λͺ¨λΈ
β”‚   └── marts/              # μ΅œμ’… λͺ¨λΈ
β”œβ”€β”€ tests/                   # μ»€μŠ€ν…€ ν…ŒμŠ€νŠΈ
β”œβ”€β”€ macros/                  # μž¬μ‚¬μš© 맀크둜
β”œβ”€β”€ seeds/                   # μ‹œλ“œ 데이터 (CSV)
β”œβ”€β”€ snapshots/               # SCD μŠ€λƒ…μƒ·
β”œβ”€β”€ analyses/                # 뢄석 쿼리
└── target/                  # 컴파일된 κ²°κ³Ό

2.2 μ„€μ • 파일

# dbt_project.yml
name: 'my_project'
version: '1.0.0'
config-version: 2

profile: 'my_project'

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

target-path: "target"
clean-targets:
  - "target"
  - "dbt_packages"

# λͺ¨λΈλ³„ μ„€μ •
models:
  my_project:
    staging:
      +materialized: view
      +schema: staging
    intermediate:
      +materialized: ephemeral
    marts:
      +materialized: table
      +schema: marts
# profiles.yml (~/.dbt/profiles.yml)
my_project:
  target: dev
  outputs:
    dev:
      type: postgres
      host: localhost
      port: 5432
      user: postgres
      password: "{{ env_var('DB_PASSWORD') }}"
      dbname: analytics
      schema: dbt_dev
      threads: 4

    prod:
      type: postgres
      host: prod-db.example.com
      port: 5432
      user: "{{ env_var('PROD_USER') }}"
      password: "{{ env_var('PROD_PASSWORD') }}"
      dbname: analytics
      schema: dbt_prod
      threads: 8

3. λͺ¨λΈ (Models)

3.1 κΈ°λ³Έ λͺ¨λΈ

-- models/staging/stg_orders.sql
-- μŠ€ν…Œμ΄μ§• λͺ¨λΈ: μ†ŒμŠ€ 데이터 μ •μ œ

SELECT
    order_id,
    customer_id,
    CAST(order_date AS DATE) AS order_date,
    CAST(amount AS DECIMAL(10, 2)) AS amount,
    status,
    CURRENT_TIMESTAMP AS loaded_at
FROM {{ source('raw', 'orders') }}
WHERE order_id IS NOT NULL
-- models/staging/stg_customers.sql
SELECT
    customer_id,
    TRIM(first_name) AS first_name,
    TRIM(last_name) AS last_name,
    LOWER(email) AS email,
    created_at
FROM {{ source('raw', 'customers') }}
-- models/marts/core/fct_orders.sql
-- 팩트 ν…Œμ΄λΈ”: μ£Όλ¬Έ

{{
    config(
        materialized='table',
        unique_key='order_id',
        partition_by={
            'field': 'order_date',
            'data_type': 'date',
            'granularity': 'month'
        }
    )
}}

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),

customers AS (
    SELECT * FROM {{ ref('stg_customers') }}
)

SELECT
    o.order_id,
    o.customer_id,
    c.first_name,
    c.last_name,
    c.email,
    o.order_date,
    o.amount,
    o.status,
    -- νŒŒμƒ 컬럼
    EXTRACT(YEAR FROM o.order_date) AS order_year,
    EXTRACT(MONTH FROM o.order_date) AS order_month,
    CASE
        WHEN o.amount > 1000 THEN 'high'
        WHEN o.amount > 100 THEN 'medium'
        ELSE 'low'
    END AS order_tier
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id

3.2 μ†ŒμŠ€ μ •μ˜

# models/staging/_sources.yml
version: 2

sources:
  - name: raw
    description: "원본 데이터 μ†ŒμŠ€"
    database: raw_db
    schema: public
    tables:
      - name: orders
        description: "μ£Όλ¬Έ 원본 ν…Œμ΄λΈ”"
        columns:
          - name: order_id
            description: "주문 고유 ID"
            tests:
              - unique
              - not_null
          - name: customer_id
            description: "고객 ID"
          - name: amount
            description: "μ£Όλ¬Έ κΈˆμ•‘"

      - name: customers
        description: "고객 원본 ν…Œμ΄λΈ”"
        freshness:
          warn_after: {count: 12, period: hour}
          error_after: {count: 24, period: hour}
        loaded_at_field: updated_at

3.3 Materialization μœ ν˜•

-- View (κΈ°λ³Έκ°’): 맀번 쿼리 μ‹€ν–‰
{{ config(materialized='view') }}

-- Table: 물리적 ν…Œμ΄λΈ” 생성
{{ config(materialized='table') }}

-- Incremental: 증뢄 처리
{{ config(
    materialized='incremental',
    unique_key='order_id',
    on_schema_change='append_new_columns'
) }}

SELECT *
FROM {{ source('raw', 'orders') }}
{% if is_incremental() %}
WHERE order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}

-- Ephemeral: CTE둜 인라인 (ν…Œμ΄λΈ” 미생성)
{{ config(materialized='ephemeral') }}

4. ν…ŒμŠ€νŠΈ

4.1 μŠ€ν‚€λ§ˆ ν…ŒμŠ€νŠΈ

# models/marts/core/_schema.yml
version: 2

models:
  - name: fct_orders
    description: "μ£Όλ¬Έ 팩트 ν…Œμ΄λΈ”"
    columns:
      - name: order_id
        description: "주문 고유 ID"
        tests:
          - unique
          - not_null

      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id

      - name: amount
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: ">= 0"

      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'completed', 'cancelled', 'refunded']

4.2 μ»€μŠ€ν…€ ν…ŒμŠ€νŠΈ

-- tests/assert_positive_amounts.sql
-- λͺ¨λ“  μ£Όλ¬Έ κΈˆμ•‘μ΄ μ–‘μˆ˜μΈμ§€ 확인

SELECT
    order_id,
    amount
FROM {{ ref('fct_orders') }}
WHERE amount < 0
-- macros/test_row_count_equal.sql
{% test row_count_equal(model, compare_model) %}

WITH model_count AS (
    SELECT COUNT(*) AS cnt FROM {{ model }}
),

compare_count AS (
    SELECT COUNT(*) AS cnt FROM {{ compare_model }}
)

SELECT
    m.cnt AS model_count,
    c.cnt AS compare_count
FROM model_count m
CROSS JOIN compare_count c
WHERE m.cnt != c.cnt

{% endtest %}

4.3 ν…ŒμŠ€νŠΈ μ‹€ν–‰

# λͺ¨λ“  ν…ŒμŠ€νŠΈ μ‹€ν–‰
dbt test

# νŠΉμ • λͺ¨λΈ ν…ŒμŠ€νŠΈ
dbt test --select fct_orders

# μ†ŒμŠ€ freshness ν…ŒμŠ€νŠΈ
dbt source freshness

5. λ¬Έμ„œν™”

5.1 λ¬Έμ„œ μ •μ˜

# models/marts/core/_schema.yml
version: 2

models:
  - name: fct_orders
    description: |
      ## μ£Όλ¬Έ 팩트 ν…Œμ΄λΈ”

      이 ν…Œμ΄λΈ”μ€ λͺ¨λ“  μ£Όλ¬Έ νŠΈλžœμž­μ…˜μ„ ν¬ν•¨ν•©λ‹ˆλ‹€.

      ### μ‚¬μš© 사둀
      - 일별/월별 맀좜 뢄석
      - 고객 ꡬ맀 νŒ¨ν„΄ 뢄석
      - 재ꡬ맀율 계산

      ### κ°±μ‹  μ£ΌκΈ°
      - 맀일 06:00 UTC

    meta:
      owner: "data-team@company.com"
      contains_pii: false

    columns:
      - name: order_id
        description: "μ£Όλ¬Έ 고유 μ‹λ³„μž (UUID)"
        meta:
          dimension: true

      - name: amount
        description: "μ£Όλ¬Έ 총앑 (USD)"
        meta:
          measure: true
          aggregation: sum

5.2 λ¬Έμ„œ 생성 및 μ„œλΉ™

# λ¬Έμ„œ 생성
dbt docs generate

# λ¬Έμ„œ μ„œλ²„ μ‹€ν–‰
dbt docs serve --port 8080

# http://localhost:8080 μ—μ„œ 확인

6. Jinja ν…œν”Œλ¦Ώ

6.1 κΈ°λ³Έ Jinja 문법

-- λ³€μˆ˜
{% set my_var = 'value' %}
SELECT '{{ my_var }}' AS col

-- 쑰건문
SELECT
    CASE
        {% if target.name == 'prod' %}
        WHEN amount > 1000 THEN 'high'
        {% else %}
        WHEN amount > 100 THEN 'high'
        {% endif %}
        ELSE 'low'
    END AS tier
FROM orders

-- 반볡문
SELECT
    order_id,
    {% for col in ['amount', 'quantity', 'discount'] %}
    SUM({{ col }}) AS total_{{ col }}{% if not loop.last %},{% endif %}
    {% endfor %}
FROM order_items
GROUP BY order_id

6.2 Macros

-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}
        {{ default_schema }}
    {%- else -%}
        {{ default_schema }}_{{ custom_schema_name }}
    {%- endif -%}
{%- endmacro %}


-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
    ROUND({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}


-- macros/limit_data_in_dev.sql
{% macro limit_data_in_dev() %}
    {% if target.name == 'dev' %}
        LIMIT 1000
    {% endif %}
{% endmacro %}
-- 맀크둜 μ‚¬μš©
SELECT
    order_id,
    {{ cents_to_dollars('amount_cents') }} AS amount_dollars
FROM orders
{{ limit_data_in_dev() }}

6.3 dbt λ‚΄μž₯ ν•¨μˆ˜

-- ref(): λ‹€λ₯Έ λͺ¨λΈ μ°Έμ‘°
SELECT * FROM {{ ref('stg_orders') }}

-- source(): μ†ŒμŠ€ ν…Œμ΄λΈ” μ°Έμ‘°
SELECT * FROM {{ source('raw', 'orders') }}

-- this: ν˜„μž¬ λͺ¨λΈ μ°Έμ‘° (incrementalμ—μ„œ 유용)
{% if is_incremental() %}
SELECT MAX(updated_at) FROM {{ this }}
{% endif %}

-- config(): μ„€μ • κ°’ μ ‘κ·Ό
{{ config.get('materialized') }}

-- target: νƒ€κ²Ÿ ν™˜κ²½ 정보
{{ target.name }}    -- dev, prod
{{ target.schema }}  -- dbt_dev
{{ target.type }}    -- postgres, snowflake

7. 증뢄 처리 (Incremental)

7.1 κΈ°λ³Έ 증뢄 λͺ¨λΈ

-- models/marts/fct_events.sql
{{
    config(
        materialized='incremental',
        unique_key='event_id',
        incremental_strategy='delete+insert'
    )
}}

SELECT
    event_id,
    user_id,
    event_type,
    event_data,
    created_at
FROM {{ source('raw', 'events') }}

{% if is_incremental() %}
-- λ§ˆμ§€λ§‰ μ‹€ν–‰ 이후 λ°μ΄ν„°λ§Œ 처리
WHERE created_at > (SELECT MAX(created_at) FROM {{ this }})
{% endif %}

7.2 증뢄 μ „λž΅

-- Append (κΈ°λ³Έ): μƒˆ 데이터 μΆ”κ°€λ§Œ
{{ config(
    materialized='incremental',
    incremental_strategy='append'
) }}

-- Delete+Insert: ν‚€ κΈ°μ€€ μ‚­μ œ ν›„ μ‚½μž…
{{ config(
    materialized='incremental',
    unique_key='id',
    incremental_strategy='delete+insert'
) }}

-- Merge (Snowflake, BigQuery): MERGE λ¬Έ μ‚¬μš©
{{ config(
    materialized='incremental',
    unique_key='id',
    incremental_strategy='merge',
    merge_update_columns=['name', 'amount', 'updated_at']
) }}

8. μ‹€ν–‰ λͺ…λ Ή

8.1 κΈ°λ³Έ λͺ…λ Ή

# μ—°κ²° ν…ŒμŠ€νŠΈ
dbt debug

# λͺ¨λ“  λͺ¨λΈ μ‹€ν–‰
dbt run

# νŠΉμ • λͺ¨λΈλ§Œ μ‹€ν–‰
dbt run --select fct_orders
dbt run --select staging.*
dbt run --select +fct_orders+  # μ˜μ‘΄μ„± 포함

# ν…ŒμŠ€νŠΈ μ‹€ν–‰
dbt test

# λΉŒλ“œ (run + test)
dbt build

# Seed 데이터 λ‘œλ“œ
dbt seed

# 컴파일만 (μ‹€ν–‰ μ•ˆ 함)
dbt compile

# 정리
dbt clean

8.2 μ„ νƒμž (Selectors)

# λͺ¨λΈ μ΄λ¦„μœΌλ‘œ
dbt run --select my_model

# 경둜둜
dbt run --select models/staging/*

# νƒœκ·Έλ‘œ
dbt run --select tag:daily

# μƒμœ„ μ˜μ‘΄μ„± 포함
dbt run --select +my_model

# ν•˜μœ„ μ˜μ‘΄μ„± 포함
dbt run --select my_model+

# μ–‘λ°©ν–₯
dbt run --select +my_model+

# νŠΉμ • λͺ¨λΈ μ œμ™Έ
dbt run --exclude my_model

μ—°μŠ΅ 문제

문제 1: μŠ€ν…Œμ΄μ§• λͺ¨λΈ

원본 products ν…Œμ΄λΈ”μ—μ„œ stg_products λͺ¨λΈμ„ μƒμ„±ν•˜μ„Έμš”. 가격을 λ‹¬λŸ¬λ‘œ λ³€ν™˜ν•˜κ³  NULL 값을 μ²˜λ¦¬ν•˜μ„Έμš”.

문제 2: 증뢄 λͺ¨λΈ

일별 판맀 집계 ν…Œμ΄λΈ”μ„ μ¦λΆ„μœΌλ‘œ μ²˜λ¦¬ν•˜λŠ” λͺ¨λΈμ„ μž‘μ„±ν•˜μ„Έμš”.

문제 3: ν…ŒμŠ€νŠΈ μž‘μ„±

fct_sales λͺ¨λΈμ— λŒ€ν•œ ν…ŒμŠ€νŠΈλ₯Ό μž‘μ„±ν•˜μ„Έμš” (unique, not_null, κΈˆμ•‘ μ–‘μˆ˜ 확인).


μš”μ•½

κ°œλ… μ„€λͺ…
Model SQL 기반 데이터 λ³€ν™˜ μ •μ˜
Source 원본 데이터 μ°Έμ‘°
ref() λͺ¨λΈ κ°„ μ°Έμ‘° (μ˜μ‘΄μ„± μžλ™ 관리)
Test 데이터 ν’ˆμ§ˆ 검증
Materialization view, table, incremental, ephemeral
Macro μž¬μ‚¬μš© κ°€λŠ₯ν•œ SQL ν…œν”Œλ¦Ώ

참고 자료

to navigate between lessons