Airflow ์‹ฌํ™”

Airflow ์‹ฌํ™”

๊ฐœ์š”

์ด ๋ฌธ์„œ์—์„œ๋Š” Airflow์˜ ๊ณ ๊ธ‰ ๊ธฐ๋Šฅ์ธ XCom์„ ํ†ตํ•œ ๋ฐ์ดํ„ฐ ๊ณต์œ , ๋™์  DAG ์ƒ์„ฑ, Sensor, Hook, TaskGroup ๋“ฑ์„ ๋‹ค๋ฃน๋‹ˆ๋‹ค. ์ด๋Ÿฌํ•œ ๊ธฐ๋Šฅ์„ ํ™œ์šฉํ•˜๋ฉด ๋” ์œ ์—ฐํ•˜๊ณ  ๊ฐ•๋ ฅํ•œ ํŒŒ์ดํ”„๋ผ์ธ์„ ๊ตฌ์ถ•ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.


1. XCom (Cross-Communication)

1.1 XCom ๊ธฐ๋ณธ ์‚ฌ์šฉ๋ฒ•

XCom์€ Task ๊ฐ„์— ์ž‘์€ ๋ฐ์ดํ„ฐ๋ฅผ ๊ณต์œ ํ•˜๋Š” ๋ฉ”์ปค๋‹ˆ์ฆ˜์ž…๋‹ˆ๋‹ค.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def push_data(**kwargs):
    """XCom์œผ๋กœ ๋ฐ์ดํ„ฐ ํ‘ธ์‹œ"""
    ti = kwargs['ti']

    # ๋ฐฉ๋ฒ• 1: xcom_push ์‚ฌ์šฉ
    ti.xcom_push(key='my_key', value={'status': 'success', 'count': 100})

    # ๋ฐฉ๋ฒ• 2: return ๊ฐ’ (key='return_value'๋กœ ์ž๋™ ์ €์žฅ)
    return {'result': 'completed', 'rows': 500}


def pull_data(**kwargs):
    """XCom์—์„œ ๋ฐ์ดํ„ฐ ๊ฐ€์ ธ์˜ค๊ธฐ"""
    ti = kwargs['ti']

    # ๋ฐฉ๋ฒ• 1: ํŠน์ • key๋กœ ๊ฐ€์ ธ์˜ค๊ธฐ
    custom_data = ti.xcom_pull(key='my_key', task_ids='push_task')
    print(f"Custom data: {custom_data}")

    # ๋ฐฉ๋ฒ• 2: return ๊ฐ’ ๊ฐ€์ ธ์˜ค๊ธฐ
    return_value = ti.xcom_pull(task_ids='push_task')  # key='return_value' ๊ธฐ๋ณธ๊ฐ’
    print(f"Return value: {return_value}")

    # ๋ฐฉ๋ฒ• 3: ์—ฌ๋Ÿฌ Task์—์„œ ๊ฐ€์ ธ์˜ค๊ธฐ
    multiple_results = ti.xcom_pull(task_ids=['task1', 'task2'])


with DAG('xcom_example', start_date=datetime(2024, 1, 1), schedule_interval=None) as dag:

    push_task = PythonOperator(
        task_id='push_task',
        python_callable=push_data,
    )

    pull_task = PythonOperator(
        task_id='pull_task',
        python_callable=pull_data,
    )

    push_task >> pull_task

1.2 Jinja ํ…œํ”Œ๋ฆฟ์—์„œ XCom ์‚ฌ์šฉ

from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

# Bash์—์„œ XCom ์‚ฌ์šฉ
bash_task = BashOperator(
    task_id='bash_with_xcom',
    bash_command='echo "Result: {{ ti.xcom_pull(task_ids="push_task") }}"',
)

# SQL์—์„œ XCom ์‚ฌ์šฉ
sql_task = PostgresOperator(
    task_id='sql_with_xcom',
    postgres_conn_id='my_postgres',
    sql="""
        INSERT INTO process_log (task_id, result_count, processed_at)
        VALUES (
            'data_load',
            {{ ti.xcom_pull(task_ids='count_task', key='row_count') }},
            NOW()
        );
    """,
)

1.3 XCom ์ œํ•œ ์‚ฌํ•ญ ๋ฐ ๋Œ€์•ˆ

# XCom ์ œํ•œ: ๊ธฐ๋ณธ 1GB (DB์— ์ €์žฅ๋˜๋ฏ€๋กœ ์ž‘์€ ๋ฐ์ดํ„ฐ๋งŒ ๊ถŒ์žฅ)

# ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ๋ฐฉ๋ฒ•
class LargeDataHandler:
    """๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ ์ „๋‹ฌ ํŒจํ„ด"""

    @staticmethod
    def save_to_storage(data, path: str):
        """๋ฐ์ดํ„ฐ๋ฅผ ์™ธ๋ถ€ ์Šคํ† ๋ฆฌ์ง€์— ์ €์žฅํ•˜๊ณ  ๊ฒฝ๋กœ๋งŒ XCom์œผ๋กœ ์ „๋‹ฌ"""
        import pandas as pd

        # S3, GCS ๋“ฑ์— ์ €์žฅ
        data.to_parquet(path)
        return path  # ๊ฒฝ๋กœ๋งŒ ๋ฐ˜ํ™˜

    @staticmethod
    def load_from_storage(path: str):
        """๊ฒฝ๋กœ์—์„œ ๋ฐ์ดํ„ฐ ๋กœ๋“œ"""
        import pandas as pd
        return pd.read_parquet(path)


# ์‚ฌ์šฉ ์˜ˆ์‹œ
def produce_large_data(**kwargs):
    import pandas as pd

    # ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ
    df = pd.DataFrame({'col': range(1000000)})

    # S3์— ์ €์žฅํ•˜๊ณ  ๊ฒฝ๋กœ๋งŒ ๋ฐ˜ํ™˜
    path = f"s3://bucket/data/{kwargs['ds']}/output.parquet"
    df.to_parquet(path)

    return path  # XCom์—๋Š” ๊ฒฝ๋กœ๋งŒ ์ €์žฅ


def consume_large_data(**kwargs):
    import pandas as pd

    ti = kwargs['ti']
    path = ti.xcom_pull(task_ids='produce_task')

    # ๊ฒฝ๋กœ์—์„œ ๋ฐ์ดํ„ฐ ๋กœ๋“œ
    df = pd.read_parquet(path)
    print(f"Loaded {len(df)} rows from {path}")

2. ๋™์  DAG ์ƒ์„ฑ

2.1 ์„ค์ • ๊ธฐ๋ฐ˜ ๋™์  DAG

# dags/dynamic_dag_factory.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# ์„ค์ • ์ •์˜
DAG_CONFIGS = [
    {
        'dag_id': 'etl_customers',
        'table': 'customers',
        'schedule': '0 1 * * *',
    },
    {
        'dag_id': 'etl_orders',
        'table': 'orders',
        'schedule': '0 2 * * *',
    },
    {
        'dag_id': 'etl_products',
        'table': 'products',
        'schedule': '0 3 * * *',
    },
]


def create_dag(config: dict) -> DAG:
    """์„ค์ • ๊ธฐ๋ฐ˜์œผ๋กœ DAG ์ƒ์„ฑ"""

    def extract_table(table_name: str, **kwargs):
        print(f"Extracting {table_name} for {kwargs['ds']}")

    def load_table(table_name: str, **kwargs):
        print(f"Loading {table_name} for {kwargs['ds']}")

    dag = DAG(
        dag_id=config['dag_id'],
        schedule_interval=config['schedule'],
        start_date=datetime(2024, 1, 1),
        catchup=False,
        tags=['dynamic', 'etl'],
    )

    with dag:
        extract = PythonOperator(
            task_id='extract',
            python_callable=extract_table,
            op_kwargs={'table_name': config['table']},
        )

        load = PythonOperator(
            task_id='load',
            python_callable=load_table,
            op_kwargs={'table_name': config['table']},
        )

        extract >> load

    return dag


# DAG๋“ค์„ globals()์— ๋“ฑ๋ก (Airflow๊ฐ€ ์ธ์‹ํ•˜๋„๋ก)
for config in DAG_CONFIGS:
    dag_id = config['dag_id']
    globals()[dag_id] = create_dag(config)

2.2 YAML/JSON ๊ธฐ๋ฐ˜ ๋™์  DAG

# dags/yaml_driven_dag.py
import yaml
from pathlib import Path
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# YAML ์„ค์ • ๋กœ๋“œ
config_path = Path(__file__).parent / 'configs' / 'dag_configs.yaml'

# configs/dag_configs.yaml ์˜ˆ์‹œ:
"""
dags:
  - id: sales_etl
    schedule: "0 6 * * *"
    tasks:
      - name: extract
        type: python
        function: extract_sales
      - name: transform
        type: python
        function: transform_sales
      - name: load
        type: python
        function: load_sales
"""

def load_config():
    with open(config_path, 'r') as f:
        return yaml.safe_load(f)


def create_task_callable(func_name: str):
    """ํ•จ์ˆ˜๋ช…์œผ๋กœ callable ์ƒ์„ฑ"""
    def task_func(**kwargs):
        print(f"Executing {func_name} for {kwargs['ds']}")
    return task_func


def create_dag_from_yaml(dag_config: dict) -> DAG:
    """YAML ์„ค์ •์œผ๋กœ DAG ์ƒ์„ฑ"""

    dag = DAG(
        dag_id=dag_config['id'],
        schedule_interval=dag_config['schedule'],
        start_date=datetime(2024, 1, 1),
        catchup=False,
    )

    with dag:
        tasks = {}
        for task_config in dag_config['tasks']:
            task = PythonOperator(
                task_id=task_config['name'],
                python_callable=create_task_callable(task_config['function']),
            )
            tasks[task_config['name']] = task

        # ์ˆœ์ฐจ ์˜์กด์„ฑ ์„ค์ •
        task_list = list(tasks.values())
        for i in range(len(task_list) - 1):
            task_list[i] >> task_list[i + 1]

    return dag


# DAG ์ƒ์„ฑ ๋ฐ ๋“ฑ๋ก
try:
    config = load_config()
    for dag_config in config.get('dags', []):
        dag_id = dag_config['id']
        globals()[dag_id] = create_dag_from_yaml(dag_config)
except Exception as e:
    print(f"Error loading DAG config: {e}")

2.3 ๋™์  Task ์ƒ์„ฑ

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime

# ์ฒ˜๋ฆฌํ•  ํ…Œ์ด๋ธ” ๋ชฉ๋ก
TABLES = ['users', 'orders', 'products', 'reviews', 'inventory']

with DAG(
    dag_id='dynamic_tasks_example',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
) as dag:

    start = EmptyOperator(task_id='start')
    end = EmptyOperator(task_id='end')

    # ๋™์ ์œผ๋กœ Task ์ƒ์„ฑ
    for table in TABLES:
        def process_table(table_name=table, **kwargs):
            print(f"Processing table: {table_name}")

        task = PythonOperator(
            task_id=f'process_{table}',
            python_callable=process_table,
            op_kwargs={'table_name': table},
        )

        start >> task >> end

3. Sensor

3.1 ๋‚ด์žฅ Sensor

from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.time_delta import TimeDeltaSensor
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.postgres.sensors.postgres import SqlSensor
from datetime import datetime, timedelta

with DAG('sensor_examples', start_date=datetime(2024, 1, 1), schedule_interval='@daily') as dag:

    # 1. FileSensor - ํŒŒ์ผ ์กด์žฌ ํ™•์ธ
    wait_for_file = FileSensor(
        task_id='wait_for_file',
        filepath='/data/input/{{ ds }}/data.csv',
        poke_interval=60,           # ํ™•์ธ ์ฃผ๊ธฐ (์ดˆ)
        timeout=3600,               # ํƒ€์ž„์•„์›ƒ (์ดˆ)
        mode='poke',                # poke ๋˜๋Š” reschedule
    )

    # 2. ExternalTaskSensor - ๋‹ค๋ฅธ DAG์˜ Task ์™„๋ฃŒ ๋Œ€๊ธฐ
    wait_for_upstream = ExternalTaskSensor(
        task_id='wait_for_upstream',
        external_dag_id='upstream_dag',
        external_task_id='final_task',
        execution_delta=timedelta(hours=0),  # ๊ฐ™์€ execution_date
        timeout=7200,
        mode='reschedule',          # ์›Œ์ปค ๋ฐ˜ํ™˜ ํ›„ ์žฌ์Šค์ผ€์ค„
    )

    # 3. HttpSensor - HTTP ์—”๋“œํฌ์ธํŠธ ํ™•์ธ
    wait_for_api = HttpSensor(
        task_id='wait_for_api',
        http_conn_id='my_api',
        endpoint='/health',
        request_params={},
        response_check=lambda response: response.status_code == 200,
        poke_interval=30,
        timeout=600,
    )

    # 4. SqlSensor - SQL ์กฐ๊ฑด ํ™•์ธ
    wait_for_data = SqlSensor(
        task_id='wait_for_data',
        conn_id='my_postgres',
        sql="""
            SELECT COUNT(*) > 0
            FROM staging_table
            WHERE date = '{{ ds }}'
        """,
        poke_interval=300,
        timeout=3600,
    )

    # 5. TimeDeltaSensor - ์‹œ๊ฐ„ ๋Œ€๊ธฐ
    wait_30_minutes = TimeDeltaSensor(
        task_id='wait_30_minutes',
        delta=timedelta(minutes=30),
    )

3.2 ์ปค์Šคํ…€ Sensor

from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
import boto3

class S3KeySensorCustom(BaseSensorOperator):
    """S3 ํ‚ค ์กด์žฌ ํ™•์ธ ์ปค์Šคํ…€ Sensor"""

    template_fields = ['bucket_key']

    @apply_defaults
    def __init__(
        self,
        bucket_name: str,
        bucket_key: str,
        aws_conn_id: str = 'aws_default',
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.bucket_name = bucket_name
        self.bucket_key = bucket_key
        self.aws_conn_id = aws_conn_id

    def poke(self, context) -> bool:
        """์กฐ๊ฑด ํ™•์ธ (True ๋ฐ˜ํ™˜ ์‹œ ์„ฑ๊ณต)"""
        self.log.info(f"Checking for s3://{self.bucket_name}/{self.bucket_key}")

        # S3 ํด๋ผ์ด์–ธํŠธ ์ƒ์„ฑ
        s3 = boto3.client('s3')

        try:
            s3.head_object(Bucket=self.bucket_name, Key=self.bucket_key)
            self.log.info("File found!")
            return True
        except s3.exceptions.ClientError as e:
            if e.response['Error']['Code'] == '404':
                self.log.info("File not found, waiting...")
                return False
            raise


# ์‚ฌ์šฉ
wait_for_s3 = S3KeySensorCustom(
    task_id='wait_for_s3_file',
    bucket_name='my-bucket',
    bucket_key='data/{{ ds }}/input.parquet',
    poke_interval=60,
    timeout=3600,
    mode='reschedule',
)

3.3 Sensor ๋ชจ๋“œ

# poke vs reschedule ๋ชจ๋“œ ๋น„๊ต
sensor_modes = {
    'poke': {
        'description': '์›Œ์ปค ์Šฌ๋กฏ์„ ์ ์œ ํ•˜๊ณ  ๋Œ€๊ธฐ',
        'pros': '๋น ๋ฅธ ๋ฐ˜์‘ ์‹œ๊ฐ„',
        'cons': '์›Œ์ปค ๋ฆฌ์†Œ์Šค ๋‚ญ๋น„',
        'use_case': '์งง์€ ๋Œ€๊ธฐ ์‹œ๊ฐ„ ์˜ˆ์ƒ'
    },
    'reschedule': {
        'description': '์›Œ์ปค ๋ฐ˜ํ™˜ ํ›„ ์žฌ์Šค์ผ€์ค„',
        'pros': '์›Œ์ปค ๋ฆฌ์†Œ์Šค ํšจ์œจ์  ์‚ฌ์šฉ',
        'cons': '๋‹ค์†Œ ๋А๋ฆฐ ๋ฐ˜์‘ ์‹œ๊ฐ„',
        'use_case': '๊ธด ๋Œ€๊ธฐ ์‹œ๊ฐ„ ์˜ˆ์ƒ'
    }
}

# ๊ถŒ์žฅ ์„ค์ •
wait_for_file = FileSensor(
    task_id='wait_for_file',
    filepath='/data/input.csv',
    poke_interval=300,      # 5๋ถ„๋งˆ๋‹ค ํ™•์ธ
    timeout=86400,          # 24์‹œ๊ฐ„ ํƒ€์ž„์•„์›ƒ
    mode='reschedule',      # ๊ธด ๋Œ€๊ธฐ์—๋Š” reschedule
    soft_fail=True,         # ํƒ€์ž„์•„์›ƒ ์‹œ ์Šคํ‚ต (์‹คํŒจ ๋Œ€์‹ )
)

4. Hook๊ณผ Connection

4.1 Connection ์„ค์ •

# Airflow UI ๋˜๋Š” CLI๋กœ Connection ์„ค์ •
# Admin > Connections > Add

# CLI๋กœ Connection ์ถ”๊ฐ€
"""
airflow connections add 'my_postgres' \
    --conn-type 'postgres' \
    --conn-host 'localhost' \
    --conn-port '5432' \
    --conn-login 'user' \
    --conn-password 'password' \
    --conn-schema 'mydb'

airflow connections add 'my_s3' \
    --conn-type 'aws' \
    --conn-extra '{"aws_access_key_id": "xxx", "aws_secret_access_key": "yyy", "region_name": "us-east-1"}'
"""

# ํ™˜๊ฒฝ ๋ณ€์ˆ˜๋กœ Connection ์„ค์ •
# AIRFLOW_CONN_MY_POSTGRES='postgresql://user:password@localhost:5432/mydb'

4.2 Hook ์‚ฌ์šฉ

from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.hooks.http import HttpHook

def use_postgres_hook(**kwargs):
    """PostgreSQL Hook ์‚ฌ์šฉ"""
    hook = PostgresHook(postgres_conn_id='my_postgres')

    # SQL ์‹คํ–‰
    records = hook.get_records("SELECT * FROM users LIMIT 10")

    # DataFrame์œผ๋กœ ๋ฐ˜ํ™˜
    df = hook.get_pandas_df("SELECT * FROM users")

    # ์‚ฝ์ž…
    hook.insert_rows(
        table='users',
        rows=[(1, 'John'), (2, 'Jane')],
        target_fields=['id', 'name']
    )

    # ์ง์ ‘ ์—ฐ๊ฒฐ ์‚ฌ์šฉ
    conn = hook.get_conn()
    cursor = conn.cursor()
    cursor.execute("UPDATE users SET active = true")
    conn.commit()


def use_s3_hook(**kwargs):
    """S3 Hook ์‚ฌ์šฉ"""
    hook = S3Hook(aws_conn_id='my_s3')

    # ํŒŒ์ผ ์—…๋กœ๋“œ
    hook.load_file(
        filename='/tmp/data.csv',
        key='data/output.csv',
        bucket_name='my-bucket',
        replace=True
    )

    # ํŒŒ์ผ ๋‹ค์šด๋กœ๋“œ
    hook.download_file(
        key='data/input.csv',
        bucket_name='my-bucket',
        local_path='/tmp/input.csv'
    )

    # ํŒŒ์ผ ๋ชฉ๋ก ์กฐํšŒ
    keys = hook.list_keys(
        bucket_name='my-bucket',
        prefix='data/',
        delimiter='/'
    )


def use_http_hook(**kwargs):
    """HTTP Hook ์‚ฌ์šฉ"""
    hook = HttpHook(http_conn_id='my_api', method='GET')

    response = hook.run(
        endpoint='/api/data',
        headers={'Authorization': 'Bearer token'},
        data={'param': 'value'}
    )

    return response.json()

4.3 ์ปค์Šคํ…€ Hook

from airflow.hooks.base import BaseHook
from typing import Any
import requests

class MyCustomHook(BaseHook):
    """์ปค์Šคํ…€ API Hook"""

    conn_name_attr = 'my_custom_conn_id'
    default_conn_name = 'my_custom_default'
    conn_type = 'http'
    hook_name = 'My Custom Hook'

    def __init__(self, my_custom_conn_id: str = default_conn_name):
        super().__init__()
        self.my_custom_conn_id = my_custom_conn_id
        self.base_url = None
        self.api_key = None

    def get_conn(self):
        """Connection ์„ค์ • ๋กœ๋“œ"""
        conn = self.get_connection(self.my_custom_conn_id)
        self.base_url = f"https://{conn.host}"
        self.api_key = conn.password
        return conn

    def make_request(self, endpoint: str, method: str = 'GET', data: dict = None) -> Any:
        """API ์š”์ฒญ"""
        self.get_conn()

        headers = {
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json'
        }

        url = f"{self.base_url}{endpoint}"

        response = requests.request(
            method=method,
            url=url,
            headers=headers,
            json=data
        )

        response.raise_for_status()
        return response.json()


# ์‚ฌ์šฉ
def call_custom_api(**kwargs):
    hook = MyCustomHook(my_custom_conn_id='my_api')
    result = hook.make_request('/users', method='GET')
    return result

5. TaskGroup

5.1 TaskGroup ๊ธฐ๋ณธ ์‚ฌ์šฉ

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime

with DAG('taskgroup_example', start_date=datetime(2024, 1, 1), schedule_interval='@daily') as dag:

    start = EmptyOperator(task_id='start')

    # TaskGroup์œผ๋กœ ๊ด€๋ จ Task ๊ทธ๋ฃนํ™”
    with TaskGroup(group_id='extract_group') as extract_group:
        extract_users = PythonOperator(
            task_id='extract_users',
            python_callable=lambda: print("Extracting users")
        )
        extract_orders = PythonOperator(
            task_id='extract_orders',
            python_callable=lambda: print("Extracting orders")
        )
        extract_products = PythonOperator(
            task_id='extract_products',
            python_callable=lambda: print("Extracting products")
        )

    with TaskGroup(group_id='transform_group') as transform_group:
        transform_users = PythonOperator(
            task_id='transform_users',
            python_callable=lambda: print("Transforming users")
        )
        transform_orders = PythonOperator(
            task_id='transform_orders',
            python_callable=lambda: print("Transforming orders")
        )

    with TaskGroup(group_id='load_group') as load_group:
        load_warehouse = PythonOperator(
            task_id='load_warehouse',
            python_callable=lambda: print("Loading to warehouse")
        )

    end = EmptyOperator(task_id='end')

    # TaskGroup ๊ฐ„ ์˜์กด์„ฑ
    start >> extract_group >> transform_group >> load_group >> end

5.2 ์ค‘์ฒฉ TaskGroup

from airflow.utils.task_group import TaskGroup

with DAG('nested_taskgroup', ...) as dag:

    with TaskGroup(group_id='data_processing') as data_processing:

        with TaskGroup(group_id='source_a') as source_a:
            extract_a = PythonOperator(task_id='extract', ...)
            transform_a = PythonOperator(task_id='transform', ...)
            extract_a >> transform_a

        with TaskGroup(group_id='source_b') as source_b:
            extract_b = PythonOperator(task_id='extract', ...)
            transform_b = PythonOperator(task_id='transform', ...)
            extract_b >> transform_b

        # ๋ณ‘๋ ฌ ์‹คํ–‰ ํ›„ ์กฐ์ธ
        join = EmptyOperator(task_id='join')
        [source_a, source_b] >> join

5.3 ๋™์  TaskGroup

from airflow.utils.task_group import TaskGroup

SOURCES = ['mysql', 'postgres', 'mongodb']

with DAG('dynamic_taskgroup', ...) as dag:

    start = EmptyOperator(task_id='start')

    task_groups = []
    for source in SOURCES:
        with TaskGroup(group_id=f'process_{source}') as tg:
            extract = PythonOperator(
                task_id='extract',
                python_callable=lambda s=source: print(f"Extract from {s}")
            )
            load = PythonOperator(
                task_id='load',
                python_callable=lambda s=source: print(f"Load {s}")
            )
            extract >> load

        task_groups.append(tg)

    end = EmptyOperator(task_id='end')

    start >> task_groups >> end

6. ๋ถ„๊ธฐ ์ฒ˜๋ฆฌ์™€ ์กฐ๊ฑด๋ถ€ ์‹คํ–‰

6.1 BranchPythonOperator

from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator

def choose_branch(**kwargs):
    """์กฐ๊ฑด์— ๋”ฐ๋ผ ๋‹ค์Œ Task ์„ ํƒ"""
    ti = kwargs['ti']
    data_count = ti.xcom_pull(task_ids='count_data')

    if data_count > 1000:
        return 'process_large'
    elif data_count > 0:
        return 'process_small'
    else:
        return 'skip_processing'


with DAG('branch_example', ...) as dag:

    count_data = PythonOperator(
        task_id='count_data',
        python_callable=lambda: 500,  # ์˜ˆ์‹œ ๋ฐ˜ํ™˜๊ฐ’
    )

    branch = BranchPythonOperator(
        task_id='branch',
        python_callable=choose_branch,
    )

    process_large = EmptyOperator(task_id='process_large')
    process_small = EmptyOperator(task_id='process_small')
    skip_processing = EmptyOperator(task_id='skip_processing')

    # ๋ถ„๊ธฐ ํ›„ ํ•ฉ๋ฅ˜
    join = EmptyOperator(
        task_id='join',
        trigger_rule='none_failed_min_one_success'  # ํ•˜๋‚˜๋ผ๋„ ์„ฑ๊ณตํ•˜๋ฉด ์‹คํ–‰
    )

    count_data >> branch >> [process_large, process_small, skip_processing] >> join

6.2 ShortCircuitOperator

from airflow.operators.python import ShortCircuitOperator

def check_condition(**kwargs):
    """์กฐ๊ฑด ํ™•์ธ - False ๋ฐ˜ํ™˜ ์‹œ ์ดํ›„ Task ์Šคํ‚ต"""
    ds = kwargs['ds']
    # ์ฃผ๋ง์ด๋ฉด ์Šคํ‚ต
    day_of_week = datetime.strptime(ds, '%Y-%m-%d').weekday()
    return day_of_week < 5  # ํ‰์ผ๋งŒ True


with DAG('shortcircuit_example', ...) as dag:

    check = ShortCircuitOperator(
        task_id='check_weekday',
        python_callable=check_condition,
    )

    # check๊ฐ€ False ๋ฐ˜ํ™˜ ์‹œ ์•„๋ž˜ Task๋“ค์€ ์Šคํ‚ต๋จ
    process = PythonOperator(task_id='process', ...)
    load = PythonOperator(task_id='load', ...)

    check >> process >> load

์—ฐ์Šต ๋ฌธ์ œ

๋ฌธ์ œ 1: XCom ํ™œ์šฉ

๋‘ ๊ฐœ์˜ Task์—์„œ ๊ฐ๊ฐ ์ˆซ์ž๋ฅผ ๋ฐ˜ํ™˜ํ•˜๊ณ , ์„ธ ๋ฒˆ์งธ Task์—์„œ ๋‘ ์ˆซ์ž์˜ ํ•ฉ์„ ๊ณ„์‚ฐํ•˜๋Š” DAG๋ฅผ ์ž‘์„ฑํ•˜์„ธ์š”.

๋ฌธ์ œ 2: ๋™์  DAG

ํ…Œ์ด๋ธ” ๋ชฉ๋ก(users, orders, products)์„ ๊ธฐ๋ฐ˜์œผ๋กœ ๊ฐ ํ…Œ์ด๋ธ”์— ๋Œ€ํ•œ ETL Task๋ฅผ ๋™์ ์œผ๋กœ ์ƒ์„ฑํ•˜๋Š” DAG๋ฅผ ์ž‘์„ฑํ•˜์„ธ์š”.

๋ฌธ์ œ 3: Sensor ์‚ฌ์šฉ

ํŒŒ์ผ์ด ์ƒ์„ฑ๋  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐํ•œ ํ›„ ์ฒ˜๋ฆฌํ•˜๋Š” DAG๋ฅผ ์ž‘์„ฑํ•˜์„ธ์š”.


์š”์•ฝ

๊ธฐ๋Šฅ ์„ค๋ช…
XCom Task ๊ฐ„ ๋ฐ์ดํ„ฐ ๊ณต์œ  ๋ฉ”์ปค๋‹ˆ์ฆ˜
๋™์  DAG ์„ค์ • ๊ธฐ๋ฐ˜์œผ๋กœ DAG/Task ๋™์  ์ƒ์„ฑ
Sensor ์กฐ๊ฑด ์ถฉ์กฑ๊นŒ์ง€ ๋Œ€๊ธฐํ•˜๋Š” Operator
Hook ์™ธ๋ถ€ ์‹œ์Šคํ…œ ์—ฐ๊ฒฐ ์ธํ„ฐํŽ˜์ด์Šค
TaskGroup ๊ด€๋ จ Task๋ฅผ ๊ทธ๋ฃนํ™”ํ•˜์—ฌ ์‹œ๊ฐํ™”
Branch ์กฐ๊ฑด์— ๋”ฐ๋ฅธ ๋ถ„๊ธฐ ์ฒ˜๋ฆฌ

์ฐธ๊ณ  ์ž๋ฃŒ

to navigate between lessons