dbt λ³ν λꡬ
dbt λ³ν λꡬ¶
κ°μ¶
dbt(data build tool)λ SQL κΈ°λ°μ λ°μ΄ν° λ³ν λꡬμ λλ€. ELT ν¨ν΄μμ Transform λ¨κ³λ₯Ό λ΄λΉνλ©°, μννΈμ¨μ΄ μμ§λμ΄λ§ λͺ¨λ² μ¬λ‘(λ²μ κ΄λ¦¬, ν μ€νΈ, λ¬Έμν)λ₯Ό λ°μ΄ν° λ³νμ μ μ©ν©λλ€.
1. dbt κ°μ¶
1.1 dbtλ?¶
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β dbt μν β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ELT νμ΄νλΌμΈμμ T(Transform) λ΄λΉ β
β β
β ββββββββββββ ββββββββββββ ββββββββββββ β
β β Extract β β β Load β β βTransform β β
β β (Fivetranβ β (to DW) β β (dbt) β β
β β Airbyte)β β β β β β
β ββββββββββββ ββββββββββββ ββββββββββββ β
β β
β dbt ν΅μ¬ κΈ°λ₯: β
β - SQL κΈ°λ° λͺ¨λΈ μ μ β
β - μμ‘΄μ± μλ κ΄λ¦¬ β
β - ν
μ€νΈ λ° λ¬Έμν β
β - Jinja ν
νλ¦Ώ μ§μ β
β - λ²μ κ΄λ¦¬ (Git) β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1.2 dbt Core vs dbt Cloud¶
| νΉμ± | dbt Core | dbt Cloud |
|---|---|---|
| λΉμ© | λ¬΄λ£ (μ€νμμ€) | μ λ£ (SaaS) |
| μ€ν | CLI | Web UI + API |
| μ€μΌμ€λ§ | μΈλΆ λꡬ νμ (Airflow) | λ΄μ₯ μ€μΌμ€λ¬ |
| IDE | VS Code λ± | λ΄μ₯ IDE |
| νμ | Git μ¬μ© | λ΄μ₯ νμ κΈ°λ₯ |
1.3 μ€μΉ¶
# dbt Core μ€μΉ
pip install dbt-core
# λ°μ΄ν°λ² μ΄μ€λ³ μ΄λν° μ€μΉ
pip install dbt-postgres # PostgreSQL
pip install dbt-snowflake # Snowflake
pip install dbt-bigquery # BigQuery
pip install dbt-redshift # Redshift
pip install dbt-databricks # Databricks
# λ²μ νμΈ
dbt --version
2. νλ‘μ νΈ κ΅¬μ‘°¶
2.1 νλ‘μ νΈ μ΄κΈ°ν¶
# μ νλ‘μ νΈ μμ±
dbt init my_project
cd my_project
# νλ‘μ νΈ κ΅¬μ‘°
my_project/
βββ dbt_project.yml # νλ‘μ νΈ μ€μ
βββ profiles.yml # μ°κ²° μ€μ (~/.dbt/profiles.yml)
βββ models/ # SQL λͺ¨λΈ
β βββ staging/ # μ€ν
μ΄μ§ λͺ¨λΈ
β βββ intermediate/ # μ€κ° λͺ¨λΈ
β βββ marts/ # μ΅μ’
λͺ¨λΈ
βββ tests/ # 컀μ€ν
ν
μ€νΈ
βββ macros/ # μ¬μ¬μ© λ§€ν¬λ‘
βββ seeds/ # μλ λ°μ΄ν° (CSV)
βββ snapshots/ # SCD μ€λ
μ·
βββ analyses/ # λΆμ 쿼리
βββ target/ # μ»΄νμΌλ κ²°κ³Ό
2.2 μ€μ νμΌ¶
# dbt_project.yml
name: 'my_project'
version: '1.0.0'
config-version: 2
profile: 'my_project'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
- "target"
- "dbt_packages"
# λͺ¨λΈλ³ μ€μ
models:
my_project:
staging:
+materialized: view
+schema: staging
intermediate:
+materialized: ephemeral
marts:
+materialized: table
+schema: marts
# profiles.yml (~/.dbt/profiles.yml)
my_project:
target: dev
outputs:
dev:
type: postgres
host: localhost
port: 5432
user: postgres
password: "{{ env_var('DB_PASSWORD') }}"
dbname: analytics
schema: dbt_dev
threads: 4
prod:
type: postgres
host: prod-db.example.com
port: 5432
user: "{{ env_var('PROD_USER') }}"
password: "{{ env_var('PROD_PASSWORD') }}"
dbname: analytics
schema: dbt_prod
threads: 8
3. λͺ¨λΈ (Models)¶
3.1 κΈ°λ³Έ λͺ¨λΈ¶
-- models/staging/stg_orders.sql
-- μ€ν
μ΄μ§ λͺ¨λΈ: μμ€ λ°μ΄ν° μ μ
SELECT
order_id,
customer_id,
CAST(order_date AS DATE) AS order_date,
CAST(amount AS DECIMAL(10, 2)) AS amount,
status,
CURRENT_TIMESTAMP AS loaded_at
FROM {{ source('raw', 'orders') }}
WHERE order_id IS NOT NULL
-- models/staging/stg_customers.sql
SELECT
customer_id,
TRIM(first_name) AS first_name,
TRIM(last_name) AS last_name,
LOWER(email) AS email,
created_at
FROM {{ source('raw', 'customers') }}
-- models/marts/core/fct_orders.sql
-- ν©νΈ ν
μ΄λΈ: μ£Όλ¬Έ
{{
config(
materialized='table',
unique_key='order_id',
partition_by={
'field': 'order_date',
'data_type': 'date',
'granularity': 'month'
}
)
}}
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
customers AS (
SELECT * FROM {{ ref('stg_customers') }}
)
SELECT
o.order_id,
o.customer_id,
c.first_name,
c.last_name,
c.email,
o.order_date,
o.amount,
o.status,
-- νμ 컬λΌ
EXTRACT(YEAR FROM o.order_date) AS order_year,
EXTRACT(MONTH FROM o.order_date) AS order_month,
CASE
WHEN o.amount > 1000 THEN 'high'
WHEN o.amount > 100 THEN 'medium'
ELSE 'low'
END AS order_tier
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id
3.2 μμ€ μ μ¶
# models/staging/_sources.yml
version: 2
sources:
- name: raw
description: "μλ³Έ λ°μ΄ν° μμ€"
database: raw_db
schema: public
tables:
- name: orders
description: "μ£Όλ¬Έ μλ³Έ ν
μ΄λΈ"
columns:
- name: order_id
description: "μ£Όλ¬Έ κ³ μ ID"
tests:
- unique
- not_null
- name: customer_id
description: "κ³ κ° ID"
- name: amount
description: "μ£Όλ¬Έ κΈμ‘"
- name: customers
description: "κ³ κ° μλ³Έ ν
μ΄λΈ"
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
loaded_at_field: updated_at
3.3 Materialization μ ν¶
-- View (κΈ°λ³Έκ°): λ§€λ² μΏΌλ¦¬ μ€ν
{{ config(materialized='view') }}
-- Table: 물리μ ν
μ΄λΈ μμ±
{{ config(materialized='table') }}
-- Incremental: μ¦λΆ μ²λ¦¬
{{ config(
materialized='incremental',
unique_key='order_id',
on_schema_change='append_new_columns'
) }}
SELECT *
FROM {{ source('raw', 'orders') }}
{% if is_incremental() %}
WHERE order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}
-- Ephemeral: CTEλ‘ μΈλΌμΈ (ν
μ΄λΈ λ―Έμμ±)
{{ config(materialized='ephemeral') }}
4. ν μ€νΈ¶
4.1 μ€ν€λ§ ν μ€νΈ¶
# models/marts/core/_schema.yml
version: 2
models:
- name: fct_orders
description: "μ£Όλ¬Έ ν©νΈ ν
μ΄λΈ"
columns:
- name: order_id
description: "μ£Όλ¬Έ κ³ μ ID"
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: amount
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: status
tests:
- accepted_values:
values: ['pending', 'completed', 'cancelled', 'refunded']
4.2 컀μ€ν ν μ€νΈ¶
-- tests/assert_positive_amounts.sql
-- λͺ¨λ μ£Όλ¬Έ κΈμ‘μ΄ μμμΈμ§ νμΈ
SELECT
order_id,
amount
FROM {{ ref('fct_orders') }}
WHERE amount < 0
-- macros/test_row_count_equal.sql
{% test row_count_equal(model, compare_model) %}
WITH model_count AS (
SELECT COUNT(*) AS cnt FROM {{ model }}
),
compare_count AS (
SELECT COUNT(*) AS cnt FROM {{ compare_model }}
)
SELECT
m.cnt AS model_count,
c.cnt AS compare_count
FROM model_count m
CROSS JOIN compare_count c
WHERE m.cnt != c.cnt
{% endtest %}
4.3 ν μ€νΈ μ€ν¶
# λͺ¨λ ν
μ€νΈ μ€ν
dbt test
# νΉμ λͺ¨λΈ ν
μ€νΈ
dbt test --select fct_orders
# μμ€ freshness ν
μ€νΈ
dbt source freshness
5. λ¬Έμν¶
5.1 λ¬Έμ μ μ¶
# models/marts/core/_schema.yml
version: 2
models:
- name: fct_orders
description: |
## μ£Όλ¬Έ ν©νΈ ν
μ΄λΈ
μ΄ ν
μ΄λΈμ λͺ¨λ μ£Όλ¬Έ νΈλμμ
μ ν¬ν¨ν©λλ€.
### μ¬μ© μ¬λ‘
- μΌλ³/μλ³ λ§€μΆ λΆμ
- κ³ κ° κ΅¬λ§€ ν¨ν΄ λΆμ
- μ¬κ΅¬λ§€μ¨ κ³μ°
### κ°±μ μ£ΌκΈ°
- λ§€μΌ 06:00 UTC
meta:
owner: "data-team@company.com"
contains_pii: false
columns:
- name: order_id
description: "μ£Όλ¬Έ κ³ μ μλ³μ (UUID)"
meta:
dimension: true
- name: amount
description: "μ£Όλ¬Έ μ΄μ‘ (USD)"
meta:
measure: true
aggregation: sum
5.2 λ¬Έμ μμ± λ° μλΉ¶
# λ¬Έμ μμ±
dbt docs generate
# λ¬Έμ μλ² μ€ν
dbt docs serve --port 8080
# http://localhost:8080 μμ νμΈ
6. Jinja ν νλ¦Ώ¶
6.1 κΈ°λ³Έ Jinja λ¬Έλ²¶
-- λ³μ
{% set my_var = 'value' %}
SELECT '{{ my_var }}' AS col
-- 쑰건문
SELECT
CASE
{% if target.name == 'prod' %}
WHEN amount > 1000 THEN 'high'
{% else %}
WHEN amount > 100 THEN 'high'
{% endif %}
ELSE 'low'
END AS tier
FROM orders
-- λ°λ³΅λ¬Έ
SELECT
order_id,
{% for col in ['amount', 'quantity', 'discount'] %}
SUM({{ col }}) AS total_{{ col }}{% if not loop.last %},{% endif %}
{% endfor %}
FROM order_items
GROUP BY order_id
6.2 Macros¶
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name }}
{%- endif -%}
{%- endmacro %}
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
ROUND({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}
-- macros/limit_data_in_dev.sql
{% macro limit_data_in_dev() %}
{% if target.name == 'dev' %}
LIMIT 1000
{% endif %}
{% endmacro %}
-- λ§€ν¬λ‘ μ¬μ©
SELECT
order_id,
{{ cents_to_dollars('amount_cents') }} AS amount_dollars
FROM orders
{{ limit_data_in_dev() }}
6.3 dbt λ΄μ₯ ν¨μ¶
-- ref(): λ€λ₯Έ λͺ¨λΈ μ°Έμ‘°
SELECT * FROM {{ ref('stg_orders') }}
-- source(): μμ€ ν
μ΄λΈ μ°Έμ‘°
SELECT * FROM {{ source('raw', 'orders') }}
-- this: νμ¬ λͺ¨λΈ μ°Έμ‘° (incrementalμμ μ μ©)
{% if is_incremental() %}
SELECT MAX(updated_at) FROM {{ this }}
{% endif %}
-- config(): μ€μ κ° μ κ·Ό
{{ config.get('materialized') }}
-- target: νκ² νκ²½ μ 보
{{ target.name }} -- dev, prod
{{ target.schema }} -- dbt_dev
{{ target.type }} -- postgres, snowflake
7. μ¦λΆ μ²λ¦¬ (Incremental)¶
7.1 κΈ°λ³Έ μ¦λΆ λͺ¨λΈ¶
-- models/marts/fct_events.sql
{{
config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='delete+insert'
)
}}
SELECT
event_id,
user_id,
event_type,
event_data,
created_at
FROM {{ source('raw', 'events') }}
{% if is_incremental() %}
-- λ§μ§λ§ μ€ν μ΄ν λ°μ΄ν°λ§ μ²λ¦¬
WHERE created_at > (SELECT MAX(created_at) FROM {{ this }})
{% endif %}
7.2 μ¦λΆ μ λ΅¶
-- Append (κΈ°λ³Έ): μ λ°μ΄ν° μΆκ°λ§
{{ config(
materialized='incremental',
incremental_strategy='append'
) }}
-- Delete+Insert: ν€ κΈ°μ€ μμ ν μ½μ
{{ config(
materialized='incremental',
unique_key='id',
incremental_strategy='delete+insert'
) }}
-- Merge (Snowflake, BigQuery): MERGE λ¬Έ μ¬μ©
{{ config(
materialized='incremental',
unique_key='id',
incremental_strategy='merge',
merge_update_columns=['name', 'amount', 'updated_at']
) }}
8. μ€ν λͺ λ Ή¶
8.1 κΈ°λ³Έ λͺ λ Ή¶
# μ°κ²° ν
μ€νΈ
dbt debug
# λͺ¨λ λͺ¨λΈ μ€ν
dbt run
# νΉμ λͺ¨λΈλ§ μ€ν
dbt run --select fct_orders
dbt run --select staging.*
dbt run --select +fct_orders+ # μμ‘΄μ± ν¬ν¨
# ν
μ€νΈ μ€ν
dbt test
# λΉλ (run + test)
dbt build
# Seed λ°μ΄ν° λ‘λ
dbt seed
# μ»΄νμΌλ§ (μ€ν μ ν¨)
dbt compile
# μ 리
dbt clean
8.2 μ νμ (Selectors)¶
# λͺ¨λΈ μ΄λ¦μΌλ‘
dbt run --select my_model
# κ²½λ‘λ‘
dbt run --select models/staging/*
# νκ·Έλ‘
dbt run --select tag:daily
# μμ μμ‘΄μ± ν¬ν¨
dbt run --select +my_model
# νμ μμ‘΄μ± ν¬ν¨
dbt run --select my_model+
# μλ°©ν₯
dbt run --select +my_model+
# νΉμ λͺ¨λΈ μ μΈ
dbt run --exclude my_model
μ°μ΅ λ¬Έμ ¶
λ¬Έμ 1: μ€ν μ΄μ§ λͺ¨λΈ¶
μλ³Έ products ν μ΄λΈμμ stg_products λͺ¨λΈμ μμ±νμΈμ. κ°κ²©μ λ¬λ¬λ‘ λ³ννκ³ NULL κ°μ μ²λ¦¬νμΈμ.
λ¬Έμ 2: μ¦λΆ λͺ¨λΈ¶
μΌλ³ νλ§€ μ§κ³ ν μ΄λΈμ μ¦λΆμΌλ‘ μ²λ¦¬νλ λͺ¨λΈμ μμ±νμΈμ.
λ¬Έμ 3: ν μ€νΈ μμ±¶
fct_sales λͺ¨λΈμ λν ν μ€νΈλ₯Ό μμ±νμΈμ (unique, not_null, κΈμ‘ μμ νμΈ).
μμ½¶
| κ°λ | μ€λͺ |
|---|---|
| Model | SQL κΈ°λ° λ°μ΄ν° λ³ν μ μ |
| Source | μλ³Έ λ°μ΄ν° μ°Έμ‘° |
| ref() | λͺ¨λΈ κ° μ°Έμ‘° (μμ‘΄μ± μλ κ΄λ¦¬) |
| Test | λ°μ΄ν° νμ§ κ²μ¦ |
| Materialization | view, table, incremental, ephemeral |
| Macro | μ¬μ¬μ© κ°λ₯ν SQL ν νλ¦Ώ |