Apache Airflow κΈ°μ΄
Apache Airflow κΈ°μ΄¶
κ°μ¶
Apache Airflowλ μν¬νλ‘μ°λ₯Ό νλ‘κ·Έλλ° λ°©μμΌλ‘ μμ±, μ€μΌμ€λ§, λͺ¨λν°λ§νλ νλ«νΌμ λλ€. PythonμΌλ‘ DAG(Directed Acyclic Graph)λ₯Ό μ μνμ¬ λ³΅μ‘ν λ°μ΄ν° νμ΄νλΌμΈμ κ΄λ¦¬ν©λλ€.
1. Airflow μν€ν μ²¶
1.1 ν΅μ¬ κ΅¬μ± μμ¶
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Airflow Architecture β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ βββββββββββββββ β
β β Web Server β β Scheduler β β
β β (UI) β β (μ€μΌμ€λ¬) β β
β ββββββββ¬βββββββ ββββββββ¬βββββββ β
β β β β
β β βββββββββββββββ β β
β ββββββ Metadata ββββββ β
β β Database β β
β β (PostgreSQL)β β
β ββββββββ¬βββββββ β
β β β
β βββββββββββββ΄ββββββββββββ β
β β β β
β βββββββββββββββ βββββββββββββββ β
β β Worker β β Worker β β
β β (Celery) β β (Celery) β β
β βββββββββββββββ βββββββββββββββ β
β β
β DAGs Folder: /opt/airflow/dags/ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1.2 κ΅¬μ± μμ μν ¶
| κ΅¬μ± μμ | μν |
|---|---|
| Web Server | UI μ 곡, DAG μκ°ν, λ‘κ·Έ μ‘°ν |
| Scheduler | DAG νμ±, Task μ€μΌμ€λ§, μ€ν νΈλ¦¬κ±° |
| Executor | Task μ€ν λ°©μ κ²°μ (Local, Celery, K8s) |
| Worker | μ€μ Task μ€ν (Celery/K8s Executor) |
| Metadata DB | DAG λ©νλ°μ΄ν°, μ€ν μ΄λ ₯ μ μ₯ |
1.3 Executor μ ν¶
# airflow.cfg μ€μ
executor_types = {
"SequentialExecutor": "λ¨μΌ νλ‘μΈμ€, κ°λ°μ©",
"LocalExecutor": "λ©ν°νλ‘μΈμ€, λ¨μΌ λ¨Έμ ",
"CeleryExecutor": "λΆμ° μ²λ¦¬, νλ‘λμ
",
"KubernetesExecutor": "K8s PodμΌλ‘ μ€ν"
}
# κΆμ₯ μ€μ
# κ°λ°: LocalExecutor
# νλ‘λμ
: CeleryExecutor λλ KubernetesExecutor
2. μ€μΉ λ° νκ²½ μ€μ ¶
2.1 Docker Compose μ€μΉ (κΆμ₯)¶
# docker-compose.yaml
version: '3.8'
x-airflow-common: &airflow-common
image: apache/airflow:2.7.0
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
redis:
image: redis:latest
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
depends_on:
- postgres
- redis
airflow-scheduler:
<<: *airflow-common
command: scheduler
depends_on:
- postgres
- redis
airflow-worker:
<<: *airflow-common
command: celery worker
depends_on:
- airflow-scheduler
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
command:
- -c
- |
airflow db init
airflow users create \
--username admin \
--password admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
volumes:
postgres-db-volume:
2.2 pip μ€μΉ (λ‘컬 κ°λ°)¶
# κ°μ νκ²½ μμ±
python -m venv airflow-venv
source airflow-venv/bin/activate
# Airflow μ€μΉ
pip install "apache-airflow[celery,postgres,redis]==2.7.0" \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.0/constraints-3.9.txt"
# μ΄κΈ°ν
export AIRFLOW_HOME=~/airflow
airflow db init
# μ¬μ©μ μμ±
airflow users create \
--username admin \
--password admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
# μλΉμ€ μμ
airflow webserver --port 8080 &
airflow scheduler &
3. DAG (Directed Acyclic Graph)¶
3.1 DAG κΈ°λ³Έ ꡬ쑰¶
# dags/simple_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# DAG κΈ°λ³Έ μΈμ
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'email': ['data-team@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
# DAG μ μ
with DAG(
dag_id='simple_example_dag',
default_args=default_args,
description='κ°λ¨ν μμ DAG',
schedule_interval='0 9 * * *', # λ§€μΌ μ€μ 9μ
start_date=datetime(2024, 1, 1),
catchup=False, # κ³Όκ±° μ€ν 건λλ°κΈ°
tags=['example', 'tutorial'],
) as dag:
# Task 1: Python ν¨μ μ€ν
def print_hello():
print("Hello, Airflow!")
return "Hello returned"
task_hello = PythonOperator(
task_id='print_hello',
python_callable=print_hello,
)
# Task 2: Bash λͺ
λ Ή μ€ν
task_date = BashOperator(
task_id='print_date',
bash_command='date',
)
# Task 3: Python ν¨μ (μΈμ μ λ¬)
def greet(name, **kwargs):
execution_date = kwargs['ds']
print(f"Hello, {name}! Today is {execution_date}")
task_greet = PythonOperator(
task_id='greet_user',
python_callable=greet,
op_kwargs={'name': 'Data Engineer'},
)
# Task μμ‘΄μ± μ μ
task_hello >> task_date >> task_greet
# λλ: task_hello.set_downstream(task_date)
3.2 DAG λ§€κ°λ³μ¶
from airflow import DAG
dag = DAG(
# νμ λ§€κ°λ³μ
dag_id='my_dag', # κ³ μ μλ³μ
start_date=datetime(2024, 1, 1), # μμ λ μ§
# μ€μΌμ€ κ΄λ ¨
schedule_interval='@daily', # μ€ν μ£ΌκΈ°
# schedule_interval='0 0 * * *' # Cron ννμ
# schedule_interval=timedelta(days=1)
# μ€ν μ μ΄
catchup=False, # κ³Όκ±° μ€ν μ¬λΆ
max_active_runs=1, # λμ μ€ν μ ν
max_active_tasks=10, # λμ Task μ ν
# κΈ°ν
default_args=default_args, # κΈ°λ³Έ μΈμ
description='DAG μ€λͺ
',
tags=['production', 'etl'],
doc_md="""
## DAG λ¬Έμ
μ΄ DAGλ μΌμΌ ETLμ μνν©λλ€.
"""
)
# μ€μΌμ€ ν리μ
schedule_presets = {
'@once': 'ν λ²λ§ μ€ν',
'@hourly': 'λ§€μκ° (0 * * * *)',
'@daily': 'λ§€μΌ μμ (0 0 * * *)',
'@weekly': 'λ§€μ£Ό μΌμμΌ (0 0 * * 0)',
'@monthly': 'λ§€μ 1μΌ (0 0 1 * *)',
'@yearly': 'λ§€λ
1μ 1μΌ (0 0 1 1 *)',
None: 'μλ μ€νλ§'
}
4. Operator μ ν¶
4.1 μ£Όμ Operator¶
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.email import EmailOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
# 1. PythonOperator - Python ν¨μ μ€ν
def my_function(arg1, arg2):
return arg1 + arg2
python_task = PythonOperator(
task_id='python_task',
python_callable=my_function,
op_args=[1, 2], # μμΉ μΈμ
op_kwargs={'arg1': 1}, # ν€μλ μΈμ
)
# 2. BashOperator - Bash λͺ
λ Ή μ€ν
bash_task = BashOperator(
task_id='bash_task',
bash_command='echo "Hello" && date',
env={'MY_VAR': 'value'}, # νκ²½ λ³μ
cwd='/tmp', # μμ
λλ ν 리
)
# 3. EmptyOperator - λλ―Έ Task (μμ‘΄μ± κ·Έλ£Ήν)
start = EmptyOperator(task_id='start')
end = EmptyOperator(task_id='end')
# 4. PostgresOperator - SQL μ€ν
sql_task = PostgresOperator(
task_id='sql_task',
postgres_conn_id='my_postgres',
sql="""
INSERT INTO logs (message, created_at)
VALUES ('Task executed', NOW());
""",
)
# 5. EmailOperator - μ΄λ©μΌ μ μ‘
email_task = EmailOperator(
task_id='send_email',
to='user@example.com',
subject='Airflow Notification',
html_content='<h1>Task completed!</h1>',
)
# 6. SimpleHttpOperator - HTTP μμ²
http_task = SimpleHttpOperator(
task_id='http_task',
http_conn_id='my_api',
endpoint='/api/data',
method='GET',
response_check=lambda response: response.status_code == 200,
)
4.2 λΈλμΉ Operator¶
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
def choose_branch(**kwargs):
"""쑰건μ λ°λΌ μ€νν Task μ ν"""
execution_date = kwargs['ds']
day_of_week = datetime.strptime(execution_date, '%Y-%m-%d').weekday()
if day_of_week < 5: # νμΌ
return 'weekday_task'
else: # μ£Όλ§
return 'weekend_task'
with DAG('branch_example', ...) as dag:
branch_task = BranchPythonOperator(
task_id='branch',
python_callable=choose_branch,
)
weekday_task = EmptyOperator(task_id='weekday_task')
weekend_task = EmptyOperator(task_id='weekend_task')
join_task = EmptyOperator(task_id='join', trigger_rule='none_failed_min_one_success')
branch_task >> [weekday_task, weekend_task] >> join_task
4.3 컀μ€ν Operator¶
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from typing import Any
class MyCustomOperator(BaseOperator):
"""컀μ€ν
Operator μμ"""
template_fields = ['param'] # Jinja ν
νλ¦Ώ μ§μ νλ
@apply_defaults
def __init__(
self,
param: str,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.param = param
def execute(self, context: dict) -> Any:
"""Task μ€ν λ‘μ§"""
self.log.info(f"Executing with param: {self.param}")
# contextμμ μ€ν μ 보 μ κ·Ό
execution_date = context['ds']
task_instance = context['ti']
# λΉμ¦λμ€ λ‘μ§
result = f"Processed {self.param} on {execution_date}"
# XComμΌλ‘ κ²°κ³Ό λ°ν
return result
# μ¬μ©
custom_task = MyCustomOperator(
task_id='custom_task',
param='my_value',
)
5. Task μμ‘΄μ±¶
5.1 μμ‘΄μ± μ μ λ°©λ²¶
from airflow import DAG
from airflow.operators.empty import EmptyOperator
with DAG('dependency_example', ...) as dag:
task_a = EmptyOperator(task_id='task_a')
task_b = EmptyOperator(task_id='task_b')
task_c = EmptyOperator(task_id='task_c')
task_d = EmptyOperator(task_id='task_d')
task_e = EmptyOperator(task_id='task_e')
# λ°©λ² 1: >> μ°μ°μ (κΆμ₯)
task_a >> task_b >> task_c
# λ°©λ² 2: << μ°μ°μ (μλ°©ν₯)
task_c << task_b << task_a # μμ λμΌ
# λ°©λ² 3: set_downstream / set_upstream
task_a.set_downstream(task_b)
task_b.set_downstream(task_c)
# λ³λ ¬ μ€ν
task_a >> [task_b, task_c] >> task_d
# 볡μ‘ν μμ‘΄μ±
# ββ B ββ
# A βββ€ ββββ E
# ββ C β D ββ
task_a >> task_b >> task_e
task_a >> task_c >> task_d >> task_e
5.2 Trigger Rule¶
from airflow.utils.trigger_rule import TriggerRule
# Trigger Rule μ ν
trigger_rules = {
'all_success': 'λͺ¨λ μμ Task μ±κ³΅ (κΈ°λ³Έκ°)',
'all_failed': 'λͺ¨λ μμ Task μ€ν¨',
'all_done': 'λͺ¨λ μμ Task μλ£ (μ±κ³΅/μ€ν¨ 무κ΄)',
'one_success': 'νλ μ΄μ μ±κ³΅',
'one_failed': 'νλ μ΄μ μ€ν¨',
'none_failed': 'μ€ν¨ μμ (μ€ν΅ νμ©)',
'none_failed_min_one_success': 'μ€ν¨ μκ³ μ΅μ νλ μ±κ³΅',
'none_skipped': 'μ€ν΅ μμ',
'always': 'νμ μ€ν',
}
# μ¬μ© μμ
task_join = EmptyOperator(
task_id='join',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)
# μλ¬ νΈλ€λ§ Task
task_error_handler = EmptyOperator(
task_id='error_handler',
trigger_rule=TriggerRule.ONE_FAILED,
)
6. μ€μΌμ€λ§¶
6.1 Cron ννμ¶
# Cron νμ: λΆ μ μΌ μ μμΌ
cron_examples = {
'0 0 * * *': 'λ§€μΌ μμ ',
'0 9 * * 1-5': 'νμΌ μ€μ 9μ',
'0 */2 * * *': '2μκ°λ§λ€',
'30 8 1 * *': 'λ§€μ 1μΌ μ€μ 8:30',
'0 0 * * 0': 'λ§€μ£Ό μΌμμΌ μμ ',
}
# DAGμμ μ¬μ©
dag = DAG(
dag_id='scheduled_dag',
schedule_interval='0 9 * * 1-5', # νμΌ μ€μ 9μ
start_date=datetime(2024, 1, 1),
...
)
6.2 λ°μ΄ν° κ°κ²© (Data Interval)¶
# Airflow 2.0+ λ°μ΄ν° κ°κ²© κ°λ
"""
schedule_interval = @daily, start_date = 2024-01-01
μ€ν μμ : 2024-01-02 00:00
data_interval_start: 2024-01-01 00:00
data_interval_end: 2024-01-02 00:00
logical_date (execution_date): 2024-01-01 00:00
β 2024-01-01 λ°μ΄ν°λ₯Ό μ²λ¦¬νκΈ° μν΄ 2024-01-02μ μ€ν
"""
def process_daily_data(**kwargs):
# μ²λ¦¬ν λ°μ΄ν° κΈ°κ°
data_interval_start = kwargs['data_interval_start']
data_interval_end = kwargs['data_interval_end']
print(f"Processing data from {data_interval_start} to {data_interval_end}")
# Jinja ν
νλ¦Ώ μ¬μ©
sql_task = PostgresOperator(
task_id='load_data',
sql="""
SELECT * FROM sales
WHERE sale_date >= '{{ data_interval_start }}'
AND sale_date < '{{ data_interval_end }}'
""",
)
7. κΈ°λ³Έ DAG μμ± μμ ¶
7.1 μΌμΌ ETL DAG¶
# dags/daily_etl_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.empty import EmptyOperator
default_args = {
'owner': 'data_team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['data-alerts@company.com'],
}
def extract_data(**kwargs):
"""λ°μ΄ν° μΆμΆ"""
import pandas as pd
ds = kwargs['ds'] # execution date (YYYY-MM-DD)
# μμ€μμ λ°μ΄ν° μΆμΆ
query = f"""
SELECT * FROM source_table
WHERE date = '{ds}'
"""
# df = pd.read_sql(query, source_conn)
# df.to_parquet(f'/tmp/extract_{ds}.parquet')
print(f"Extracted data for {ds}")
return f"/tmp/extract_{ds}.parquet"
def transform_data(**kwargs):
"""λ°μ΄ν° λ³ν"""
import pandas as pd
ti = kwargs['ti']
extract_path = ti.xcom_pull(task_ids='extract')
# df = pd.read_parquet(extract_path)
# λ³ν λ‘μ§
# df['new_column'] = df['column'].apply(transform_func)
# df.to_parquet(f'/tmp/transform_{kwargs["ds"]}.parquet')
print("Data transformed")
return f"/tmp/transform_{kwargs['ds']}.parquet"
with DAG(
dag_id='daily_etl_pipeline',
default_args=default_args,
description='μΌμΌ ETL νμ΄νλΌμΈ',
schedule_interval='0 6 * * *', # λ§€μΌ μ€μ 6μ
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'daily', 'production'],
) as dag:
start = EmptyOperator(task_id='start')
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
)
load = PostgresOperator(
task_id='load',
postgres_conn_id='warehouse',
sql="""
COPY target_table FROM '/tmp/transform_{{ ds }}.parquet'
WITH (FORMAT 'parquet');
""",
)
validate = PostgresOperator(
task_id='validate',
postgres_conn_id='warehouse',
sql="""
SELECT
CASE WHEN COUNT(*) > 0 THEN 1
ELSE 1/0 -- μλ¬ λ°μ
END
FROM target_table
WHERE date = '{{ ds }}';
""",
)
end = EmptyOperator(task_id='end')
# μμ‘΄μ± μ μ
start >> extract >> transform >> load >> validate >> end
μ°μ΅ λ¬Έμ ¶
λ¬Έμ 1: κΈ°λ³Έ DAG μμ±¶
λ§€μκ° μ€νλλ DAGλ₯Ό μμ±νμΈμ. νμ¬ μκ°μ λ‘κ·Έμ μΆλ ₯νκ³ , μμ νμΌμ μμ±νλ λ κ°μ Taskλ₯Ό ν¬ν¨ν΄μΌ ν©λλ€.
λ¬Έμ 2: μ‘°κ±΄λΆ μ€ν¶
νμΌκ³Ό μ£Όλ§μ λ€λ₯Έ Taskλ₯Ό μ€ννλ BranchPythonOperatorλ₯Ό μ¬μ©ν DAGλ₯Ό μμ±νμΈμ.
μμ½¶
| κ°λ | μ€λͺ |
|---|---|
| DAG | Taskμ μμ‘΄μ±μ μ μν λ°©ν₯μ± λΉμν κ·Έλν |
| Operator | Taskμ μ€ν μ ν (Python, Bash, SQL λ±) |
| Task | DAG λ΄μ κ°λ³ μμ λ¨μ |
| Scheduler | DAG νμ± λ° Task μ€μΌμ€λ§ |
| Executor | Task μ€ν λ°©μ (Local, Celery, K8s) |