Apache Airflow 기초

Apache Airflow 기초

κ°œμš”

Apache AirflowλŠ” μ›Œν¬ν”Œλ‘œμš°λ₯Ό ν”„λ‘œκ·Έλž˜λ° λ°©μ‹μœΌλ‘œ μž‘μ„±, μŠ€μΌ€μ€„λ§, λͺ¨λ‹ˆν„°λ§ν•˜λŠ” ν”Œλž«νΌμž…λ‹ˆλ‹€. Python으둜 DAG(Directed Acyclic Graph)λ₯Ό μ •μ˜ν•˜μ—¬ λ³΅μž‘ν•œ 데이터 νŒŒμ΄ν”„λΌμΈμ„ κ΄€λ¦¬ν•©λ‹ˆλ‹€.


1. Airflow μ•„ν‚€ν…μ²˜

1.1 핡심 ꡬ성 μš”μ†Œ

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Airflow Architecture                      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                              β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                   β”‚
β”‚   β”‚  Web Server β”‚         β”‚  Scheduler  β”‚                   β”‚
β”‚   β”‚    (UI)     β”‚         β”‚  (μŠ€μΌ€μ€„λŸ¬) β”‚                   β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                   β”‚
β”‚          β”‚                       β”‚                          β”‚
β”‚          β”‚    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚                          β”‚
β”‚          └───→│  Metadata   β”‚β†β”€β”€β”€β”˜                          β”‚
β”‚               β”‚  Database   β”‚                               β”‚
β”‚               β”‚ (PostgreSQL)β”‚                               β”‚
β”‚               β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                               β”‚
β”‚                      β”‚                                      β”‚
β”‚          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                          β”‚
β”‚          ↓                       ↓                          β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                   β”‚
β”‚   β”‚   Worker    β”‚         β”‚   Worker    β”‚                   β”‚
β”‚   β”‚  (Celery)   β”‚         β”‚  (Celery)   β”‚                   β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                   β”‚
β”‚                                                              β”‚
β”‚   DAGs Folder: /opt/airflow/dags/                           β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1.2 ꡬ성 μš”μ†Œ μ—­ν• 

ꡬ성 μš”μ†Œ μ—­ν• 
Web Server UI 제곡, DAG μ‹œκ°ν™”, 둜그 쑰회
Scheduler DAG νŒŒμ‹±, Task μŠ€μΌ€μ€„λ§, μ‹€ν–‰ 트리거
Executor Task μ‹€ν–‰ 방식 κ²°μ • (Local, Celery, K8s)
Worker μ‹€μ œ Task μ‹€ν–‰ (Celery/K8s Executor)
Metadata DB DAG 메타데이터, μ‹€ν–‰ 이λ ₯ μ €μž₯

1.3 Executor μœ ν˜•

# airflow.cfg μ„€μ •
executor_types = {
    "SequentialExecutor": "단일 ν”„λ‘œμ„ΈμŠ€, 개발용",
    "LocalExecutor": "λ©€ν‹°ν”„λ‘œμ„ΈμŠ€, 단일 λ¨Έμ‹ ",
    "CeleryExecutor": "λΆ„μ‚° 처리, ν”„λ‘œλ•μ…˜",
    "KubernetesExecutor": "K8s Pod으둜 μ‹€ν–‰"
}

# ꢌμž₯ μ„€μ •
# 개발: LocalExecutor
# ν”„λ‘œλ•μ…˜: CeleryExecutor λ˜λŠ” KubernetesExecutor

2. μ„€μΉ˜ 및 ν™˜κ²½ μ„€μ •

2.1 Docker Compose μ„€μΉ˜ (ꢌμž₯)

# docker-compose.yaml
version: '3.8'

x-airflow-common: &airflow-common
  image: apache/airflow:2.7.0
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data

  redis:
    image: redis:latest

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    depends_on:
      - postgres
      - redis

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    depends_on:
      - postgres
      - redis

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    depends_on:
      - airflow-scheduler

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    command:
      - -c
      - |
        airflow db init
        airflow users create \
          --username admin \
          --password admin \
          --firstname Admin \
          --lastname User \
          --role Admin \
          --email admin@example.com

volumes:
  postgres-db-volume:

2.2 pip μ„€μΉ˜ (둜컬 개발)

# 가상 ν™˜κ²½ 생성
python -m venv airflow-venv
source airflow-venv/bin/activate

# Airflow μ„€μΉ˜
pip install "apache-airflow[celery,postgres,redis]==2.7.0" \
    --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.0/constraints-3.9.txt"

# μ΄ˆκΈ°ν™”
export AIRFLOW_HOME=~/airflow
airflow db init

# μ‚¬μš©μž 생성
airflow users create \
    --username admin \
    --password admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com

# μ„œλΉ„μŠ€ μ‹œμž‘
airflow webserver --port 8080 &
airflow scheduler &

3. DAG (Directed Acyclic Graph)

3.1 DAG 기본 ꡬ쑰

# dags/simple_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

# DAG 기본 인자
default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'email': ['data-team@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

# DAG μ •μ˜
with DAG(
    dag_id='simple_example_dag',
    default_args=default_args,
    description='κ°„λ‹¨ν•œ 예제 DAG',
    schedule_interval='0 9 * * *',  # 맀일 μ˜€μ „ 9μ‹œ
    start_date=datetime(2024, 1, 1),
    catchup=False,  # κ³Όκ±° μ‹€ν–‰ κ±΄λ„ˆλ›°κΈ°
    tags=['example', 'tutorial'],
) as dag:

    # Task 1: Python ν•¨μˆ˜ μ‹€ν–‰
    def print_hello():
        print("Hello, Airflow!")
        return "Hello returned"

    task_hello = PythonOperator(
        task_id='print_hello',
        python_callable=print_hello,
    )

    # Task 2: Bash λͺ…λ Ή μ‹€ν–‰
    task_date = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    # Task 3: Python ν•¨μˆ˜ (인자 전달)
    def greet(name, **kwargs):
        execution_date = kwargs['ds']
        print(f"Hello, {name}! Today is {execution_date}")

    task_greet = PythonOperator(
        task_id='greet_user',
        python_callable=greet,
        op_kwargs={'name': 'Data Engineer'},
    )

    # Task μ˜μ‘΄μ„± μ •μ˜
    task_hello >> task_date >> task_greet
    # λ˜λŠ”: task_hello.set_downstream(task_date)

3.2 DAG λ§€κ°œλ³€μˆ˜

from airflow import DAG

dag = DAG(
    # ν•„μˆ˜ λ§€κ°œλ³€μˆ˜
    dag_id='my_dag',                    # 고유 μ‹λ³„μž
    start_date=datetime(2024, 1, 1),    # μ‹œμž‘ λ‚ μ§œ

    # μŠ€μΌ€μ€„ κ΄€λ ¨
    schedule_interval='@daily',         # μ‹€ν–‰ μ£ΌκΈ°
    # schedule_interval='0 0 * * *'     # Cron ν‘œν˜„μ‹
    # schedule_interval=timedelta(days=1)

    # μ‹€ν–‰ μ œμ–΄
    catchup=False,                      # κ³Όκ±° μ‹€ν–‰ μ—¬λΆ€
    max_active_runs=1,                  # λ™μ‹œ μ‹€ν–‰ μ œν•œ
    max_active_tasks=10,                # λ™μ‹œ Task μ œν•œ

    # 기타
    default_args=default_args,          # 기본 인자
    description='DAG μ„€λͺ…',
    tags=['production', 'etl'],
    doc_md="""
    ## DAG λ¬Έμ„œ
    이 DAGλŠ” 일일 ETL을 μˆ˜ν–‰ν•©λ‹ˆλ‹€.
    """
)

# μŠ€μΌ€μ€„ 프리셋
schedule_presets = {
    '@once': 'ν•œ 번만 μ‹€ν–‰',
    '@hourly': 'λ§€μ‹œκ°„ (0 * * * *)',
    '@daily': '맀일 μžμ • (0 0 * * *)',
    '@weekly': 'λ§€μ£Ό μΌμš”μΌ (0 0 * * 0)',
    '@monthly': 'λ§€μ›” 1일 (0 0 1 * *)',
    '@yearly': 'λ§€λ…„ 1μ›” 1일 (0 0 1 1 *)',
    None: 'μˆ˜λ™ μ‹€ν–‰λ§Œ'
}

4. Operator μœ ν˜•

4.1 μ£Όμš” Operator

from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.email import EmailOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.operators.http import SimpleHttpOperator

# 1. PythonOperator - Python ν•¨μˆ˜ μ‹€ν–‰
def my_function(arg1, arg2):
    return arg1 + arg2

python_task = PythonOperator(
    task_id='python_task',
    python_callable=my_function,
    op_args=[1, 2],              # μœ„μΉ˜ 인자
    op_kwargs={'arg1': 1},       # ν‚€μ›Œλ“œ 인자
)


# 2. BashOperator - Bash λͺ…λ Ή μ‹€ν–‰
bash_task = BashOperator(
    task_id='bash_task',
    bash_command='echo "Hello" && date',
    env={'MY_VAR': 'value'},     # ν™˜κ²½ λ³€μˆ˜
    cwd='/tmp',                  # μž‘μ—… 디렉토리
)


# 3. EmptyOperator - 더미 Task (μ˜μ‘΄μ„± κ·Έλ£Ήν™”)
start = EmptyOperator(task_id='start')
end = EmptyOperator(task_id='end')


# 4. PostgresOperator - SQL μ‹€ν–‰
sql_task = PostgresOperator(
    task_id='sql_task',
    postgres_conn_id='my_postgres',
    sql="""
        INSERT INTO logs (message, created_at)
        VALUES ('Task executed', NOW());
    """,
)


# 5. EmailOperator - 이메일 전솑
email_task = EmailOperator(
    task_id='send_email',
    to='user@example.com',
    subject='Airflow Notification',
    html_content='<h1>Task completed!</h1>',
)


# 6. SimpleHttpOperator - HTTP μš”μ²­
http_task = SimpleHttpOperator(
    task_id='http_task',
    http_conn_id='my_api',
    endpoint='/api/data',
    method='GET',
    response_check=lambda response: response.status_code == 200,
)

4.2 브랜치 Operator

from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator

def choose_branch(**kwargs):
    """쑰건에 따라 μ‹€ν–‰ν•  Task 선택"""
    execution_date = kwargs['ds']
    day_of_week = datetime.strptime(execution_date, '%Y-%m-%d').weekday()

    if day_of_week < 5:  # 평일
        return 'weekday_task'
    else:  # 주말
        return 'weekend_task'

with DAG('branch_example', ...) as dag:

    branch_task = BranchPythonOperator(
        task_id='branch',
        python_callable=choose_branch,
    )

    weekday_task = EmptyOperator(task_id='weekday_task')
    weekend_task = EmptyOperator(task_id='weekend_task')
    join_task = EmptyOperator(task_id='join', trigger_rule='none_failed_min_one_success')

    branch_task >> [weekday_task, weekend_task] >> join_task

4.3 μ»€μŠ€ν…€ Operator

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from typing import Any

class MyCustomOperator(BaseOperator):
    """μ»€μŠ€ν…€ Operator μ˜ˆμ‹œ"""

    template_fields = ['param']  # Jinja ν…œν”Œλ¦Ώ 지원 ν•„λ“œ

    @apply_defaults
    def __init__(
        self,
        param: str,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.param = param

    def execute(self, context: dict) -> Any:
        """Task μ‹€ν–‰ 둜직"""
        self.log.info(f"Executing with param: {self.param}")

        # contextμ—μ„œ μ‹€ν–‰ 정보 μ ‘κ·Ό
        execution_date = context['ds']
        task_instance = context['ti']

        # λΉ„μ¦ˆλ‹ˆμŠ€ 둜직
        result = f"Processed {self.param} on {execution_date}"

        # XCom으둜 κ²°κ³Ό λ°˜ν™˜
        return result


# μ‚¬μš©
custom_task = MyCustomOperator(
    task_id='custom_task',
    param='my_value',
)

5. Task μ˜μ‘΄μ„±

5.1 μ˜μ‘΄μ„± μ •μ˜ 방법

from airflow import DAG
from airflow.operators.empty import EmptyOperator

with DAG('dependency_example', ...) as dag:

    task_a = EmptyOperator(task_id='task_a')
    task_b = EmptyOperator(task_id='task_b')
    task_c = EmptyOperator(task_id='task_c')
    task_d = EmptyOperator(task_id='task_d')
    task_e = EmptyOperator(task_id='task_e')

    # 방법 1: >> μ—°μ‚°μž (ꢌμž₯)
    task_a >> task_b >> task_c

    # 방법 2: << μ—°μ‚°μž (μ—­λ°©ν–₯)
    task_c << task_b << task_a  # μœ„μ™€ 동일

    # 방법 3: set_downstream / set_upstream
    task_a.set_downstream(task_b)
    task_b.set_downstream(task_c)

    # 병렬 μ‹€ν–‰
    task_a >> [task_b, task_c] >> task_d

    # λ³΅μž‘ν•œ μ˜μ‘΄μ„±
    #     β”Œβ†’ B ─┐
    # A ───     β”œβ”€β”€β†’ E
    #     β””β†’ C β†’ D β”€β”˜

    task_a >> task_b >> task_e
    task_a >> task_c >> task_d >> task_e

5.2 Trigger Rule

from airflow.utils.trigger_rule import TriggerRule

# Trigger Rule μœ ν˜•
trigger_rules = {
    'all_success': 'λͺ¨λ“  μƒμœ„ Task 성곡 (κΈ°λ³Έκ°’)',
    'all_failed': 'λͺ¨λ“  μƒμœ„ Task μ‹€νŒ¨',
    'all_done': 'λͺ¨λ“  μƒμœ„ Task μ™„λ£Œ (성곡/μ‹€νŒ¨ 무관)',
    'one_success': 'ν•˜λ‚˜ 이상 성곡',
    'one_failed': 'ν•˜λ‚˜ 이상 μ‹€νŒ¨',
    'none_failed': 'μ‹€νŒ¨ μ—†μŒ (μŠ€ν‚΅ ν—ˆμš©)',
    'none_failed_min_one_success': 'μ‹€νŒ¨ μ—†κ³  μ΅œμ†Œ ν•˜λ‚˜ 성곡',
    'none_skipped': 'μŠ€ν‚΅ μ—†μŒ',
    'always': '항상 μ‹€ν–‰',
}

# μ‚¬μš© μ˜ˆμ‹œ
task_join = EmptyOperator(
    task_id='join',
    trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

# μ—λŸ¬ 핸듀링 Task
task_error_handler = EmptyOperator(
    task_id='error_handler',
    trigger_rule=TriggerRule.ONE_FAILED,
)

6. μŠ€μΌ€μ€„λ§

6.1 Cron ν‘œν˜„μ‹

# Cron ν˜•μ‹: λΆ„ μ‹œ 일 μ›” μš”μΌ
cron_examples = {
    '0 0 * * *': '맀일 μžμ •',
    '0 9 * * 1-5': '평일 μ˜€μ „ 9μ‹œ',
    '0 */2 * * *': '2μ‹œκ°„λ§ˆλ‹€',
    '30 8 1 * *': 'λ§€μ›” 1일 μ˜€μ „ 8:30',
    '0 0 * * 0': 'λ§€μ£Ό μΌμš”μΌ μžμ •',
}

# DAGμ—μ„œ μ‚¬μš©
dag = DAG(
    dag_id='scheduled_dag',
    schedule_interval='0 9 * * 1-5',  # 평일 μ˜€μ „ 9μ‹œ
    start_date=datetime(2024, 1, 1),
    ...
)

6.2 데이터 간격 (Data Interval)

# Airflow 2.0+ 데이터 간격 κ°œλ…
"""
schedule_interval = @daily, start_date = 2024-01-01

μ‹€ν–‰ μ‹œμ : 2024-01-02 00:00
data_interval_start: 2024-01-01 00:00
data_interval_end: 2024-01-02 00:00
logical_date (execution_date): 2024-01-01 00:00

β†’ 2024-01-01 데이터λ₯Ό μ²˜λ¦¬ν•˜κΈ° μœ„ν•΄ 2024-01-02에 μ‹€ν–‰
"""

def process_daily_data(**kwargs):
    # μ²˜λ¦¬ν•  데이터 κΈ°κ°„
    data_interval_start = kwargs['data_interval_start']
    data_interval_end = kwargs['data_interval_end']

    print(f"Processing data from {data_interval_start} to {data_interval_end}")

# Jinja ν…œν”Œλ¦Ώ μ‚¬μš©
sql_task = PostgresOperator(
    task_id='load_data',
    sql="""
        SELECT * FROM sales
        WHERE sale_date >= '{{ data_interval_start }}'
          AND sale_date < '{{ data_interval_end }}'
    """,
)

7. κΈ°λ³Έ DAG μž‘μ„± 예제

7.1 일일 ETL DAG

# dags/daily_etl_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.empty import EmptyOperator

default_args = {
    'owner': 'data_team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['data-alerts@company.com'],
}

def extract_data(**kwargs):
    """데이터 μΆ”μΆœ"""
    import pandas as pd

    ds = kwargs['ds']  # execution date (YYYY-MM-DD)

    # μ†ŒμŠ€μ—μ„œ 데이터 μΆ”μΆœ
    query = f"""
        SELECT * FROM source_table
        WHERE date = '{ds}'
    """

    # df = pd.read_sql(query, source_conn)
    # df.to_parquet(f'/tmp/extract_{ds}.parquet')

    print(f"Extracted data for {ds}")
    return f"/tmp/extract_{ds}.parquet"


def transform_data(**kwargs):
    """데이터 λ³€ν™˜"""
    import pandas as pd

    ti = kwargs['ti']
    extract_path = ti.xcom_pull(task_ids='extract')

    # df = pd.read_parquet(extract_path)
    # λ³€ν™˜ 둜직
    # df['new_column'] = df['column'].apply(transform_func)
    # df.to_parquet(f'/tmp/transform_{kwargs["ds"]}.parquet')

    print("Data transformed")
    return f"/tmp/transform_{kwargs['ds']}.parquet"


with DAG(
    dag_id='daily_etl_pipeline',
    default_args=default_args,
    description='일일 ETL νŒŒμ΄ν”„λΌμΈ',
    schedule_interval='0 6 * * *',  # 맀일 μ˜€μ „ 6μ‹œ
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'daily', 'production'],
) as dag:

    start = EmptyOperator(task_id='start')

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
    )

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
    )

    load = PostgresOperator(
        task_id='load',
        postgres_conn_id='warehouse',
        sql="""
            COPY target_table FROM '/tmp/transform_{{ ds }}.parquet'
            WITH (FORMAT 'parquet');
        """,
    )

    validate = PostgresOperator(
        task_id='validate',
        postgres_conn_id='warehouse',
        sql="""
            SELECT
                CASE WHEN COUNT(*) > 0 THEN 1
                     ELSE 1/0  -- μ—λŸ¬ λ°œμƒ
                END
            FROM target_table
            WHERE date = '{{ ds }}';
        """,
    )

    end = EmptyOperator(task_id='end')

    # μ˜μ‘΄μ„± μ •μ˜
    start >> extract >> transform >> load >> validate >> end

μ—°μŠ΅ 문제

문제 1: κΈ°λ³Έ DAG μž‘μ„±

λ§€μ‹œκ°„ μ‹€ν–‰λ˜λŠ” DAGλ₯Ό μž‘μ„±ν•˜μ„Έμš”. ν˜„μž¬ μ‹œκ°„μ„ λ‘œκ·Έμ— 좜λ ₯ν•˜κ³ , μž„μ‹œ νŒŒμΌμ„ μƒμ„±ν•˜λŠ” 두 개의 Taskλ₯Ό 포함해야 ν•©λ‹ˆλ‹€.

문제 2: 쑰건뢀 μ‹€ν–‰

평일과 주말에 λ‹€λ₯Έ Taskλ₯Ό μ‹€ν–‰ν•˜λŠ” BranchPythonOperatorλ₯Ό μ‚¬μš©ν•œ DAGλ₯Ό μž‘μ„±ν•˜μ„Έμš”.


μš”μ•½

κ°œλ… μ„€λͺ…
DAG Task의 μ˜μ‘΄μ„±μ„ μ •μ˜ν•œ λ°©ν–₯μ„± λΉ„μˆœν™˜ κ·Έλž˜ν”„
Operator Task의 μ‹€ν–‰ μœ ν˜• (Python, Bash, SQL λ“±)
Task DAG λ‚΄μ˜ κ°œλ³„ μž‘μ—… λ‹¨μœ„
Scheduler DAG νŒŒμ‹± 및 Task μŠ€μΌ€μ€„λ§
Executor Task μ‹€ν–‰ 방식 (Local, Celery, K8s)

참고 자료

to navigate between lessons