Airflow ์ฌํ
Airflow ์ฌํ¶
๊ฐ์¶
์ด ๋ฌธ์์์๋ Airflow์ ๊ณ ๊ธ ๊ธฐ๋ฅ์ธ XCom์ ํตํ ๋ฐ์ดํฐ ๊ณต์ , ๋์ DAG ์์ฑ, Sensor, Hook, TaskGroup ๋ฑ์ ๋ค๋ฃน๋๋ค. ์ด๋ฌํ ๊ธฐ๋ฅ์ ํ์ฉํ๋ฉด ๋ ์ ์ฐํ๊ณ ๊ฐ๋ ฅํ ํ์ดํ๋ผ์ธ์ ๊ตฌ์ถํ ์ ์์ต๋๋ค.
1. XCom (Cross-Communication)¶
1.1 XCom ๊ธฐ๋ณธ ์ฌ์ฉ๋ฒ¶
XCom์ Task ๊ฐ์ ์์ ๋ฐ์ดํฐ๋ฅผ ๊ณต์ ํ๋ ๋ฉ์ปค๋์ฆ์ ๋๋ค.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def push_data(**kwargs):
"""XCom์ผ๋ก ๋ฐ์ดํฐ ํธ์"""
ti = kwargs['ti']
# ๋ฐฉ๋ฒ 1: xcom_push ์ฌ์ฉ
ti.xcom_push(key='my_key', value={'status': 'success', 'count': 100})
# ๋ฐฉ๋ฒ 2: return ๊ฐ (key='return_value'๋ก ์๋ ์ ์ฅ)
return {'result': 'completed', 'rows': 500}
def pull_data(**kwargs):
"""XCom์์ ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ"""
ti = kwargs['ti']
# ๋ฐฉ๋ฒ 1: ํน์ key๋ก ๊ฐ์ ธ์ค๊ธฐ
custom_data = ti.xcom_pull(key='my_key', task_ids='push_task')
print(f"Custom data: {custom_data}")
# ๋ฐฉ๋ฒ 2: return ๊ฐ ๊ฐ์ ธ์ค๊ธฐ
return_value = ti.xcom_pull(task_ids='push_task') # key='return_value' ๊ธฐ๋ณธ๊ฐ
print(f"Return value: {return_value}")
# ๋ฐฉ๋ฒ 3: ์ฌ๋ฌ Task์์ ๊ฐ์ ธ์ค๊ธฐ
multiple_results = ti.xcom_pull(task_ids=['task1', 'task2'])
with DAG('xcom_example', start_date=datetime(2024, 1, 1), schedule_interval=None) as dag:
push_task = PythonOperator(
task_id='push_task',
python_callable=push_data,
)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_data,
)
push_task >> pull_task
1.2 Jinja ํ ํ๋ฆฟ์์ XCom ์ฌ์ฉ¶
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
# Bash์์ XCom ์ฌ์ฉ
bash_task = BashOperator(
task_id='bash_with_xcom',
bash_command='echo "Result: {{ ti.xcom_pull(task_ids="push_task") }}"',
)
# SQL์์ XCom ์ฌ์ฉ
sql_task = PostgresOperator(
task_id='sql_with_xcom',
postgres_conn_id='my_postgres',
sql="""
INSERT INTO process_log (task_id, result_count, processed_at)
VALUES (
'data_load',
{{ ti.xcom_pull(task_ids='count_task', key='row_count') }},
NOW()
);
""",
)
1.3 XCom ์ ํ ์ฌํญ ๋ฐ ๋์¶
# XCom ์ ํ: ๊ธฐ๋ณธ 1GB (DB์ ์ ์ฅ๋๋ฏ๋ก ์์ ๋ฐ์ดํฐ๋ง ๊ถ์ฅ)
# ๋์ฉ๋ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ๋ฐฉ๋ฒ
class LargeDataHandler:
"""๋์ฉ๋ ๋ฐ์ดํฐ ์ ๋ฌ ํจํด"""
@staticmethod
def save_to_storage(data, path: str):
"""๋ฐ์ดํฐ๋ฅผ ์ธ๋ถ ์คํ ๋ฆฌ์ง์ ์ ์ฅํ๊ณ ๊ฒฝ๋ก๋ง XCom์ผ๋ก ์ ๋ฌ"""
import pandas as pd
# S3, GCS ๋ฑ์ ์ ์ฅ
data.to_parquet(path)
return path # ๊ฒฝ๋ก๋ง ๋ฐํ
@staticmethod
def load_from_storage(path: str):
"""๊ฒฝ๋ก์์ ๋ฐ์ดํฐ ๋ก๋"""
import pandas as pd
return pd.read_parquet(path)
# ์ฌ์ฉ ์์
def produce_large_data(**kwargs):
import pandas as pd
# ๋์ฉ๋ ๋ฐ์ดํฐ ์์ฑ
df = pd.DataFrame({'col': range(1000000)})
# S3์ ์ ์ฅํ๊ณ ๊ฒฝ๋ก๋ง ๋ฐํ
path = f"s3://bucket/data/{kwargs['ds']}/output.parquet"
df.to_parquet(path)
return path # XCom์๋ ๊ฒฝ๋ก๋ง ์ ์ฅ
def consume_large_data(**kwargs):
import pandas as pd
ti = kwargs['ti']
path = ti.xcom_pull(task_ids='produce_task')
# ๊ฒฝ๋ก์์ ๋ฐ์ดํฐ ๋ก๋
df = pd.read_parquet(path)
print(f"Loaded {len(df)} rows from {path}")
2. ๋์ DAG ์์ฑ¶
2.1 ์ค์ ๊ธฐ๋ฐ ๋์ DAG¶
# dags/dynamic_dag_factory.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# ์ค์ ์ ์
DAG_CONFIGS = [
{
'dag_id': 'etl_customers',
'table': 'customers',
'schedule': '0 1 * * *',
},
{
'dag_id': 'etl_orders',
'table': 'orders',
'schedule': '0 2 * * *',
},
{
'dag_id': 'etl_products',
'table': 'products',
'schedule': '0 3 * * *',
},
]
def create_dag(config: dict) -> DAG:
"""์ค์ ๊ธฐ๋ฐ์ผ๋ก DAG ์์ฑ"""
def extract_table(table_name: str, **kwargs):
print(f"Extracting {table_name} for {kwargs['ds']}")
def load_table(table_name: str, **kwargs):
print(f"Loading {table_name} for {kwargs['ds']}")
dag = DAG(
dag_id=config['dag_id'],
schedule_interval=config['schedule'],
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['dynamic', 'etl'],
)
with dag:
extract = PythonOperator(
task_id='extract',
python_callable=extract_table,
op_kwargs={'table_name': config['table']},
)
load = PythonOperator(
task_id='load',
python_callable=load_table,
op_kwargs={'table_name': config['table']},
)
extract >> load
return dag
# DAG๋ค์ globals()์ ๋ฑ๋ก (Airflow๊ฐ ์ธ์ํ๋๋ก)
for config in DAG_CONFIGS:
dag_id = config['dag_id']
globals()[dag_id] = create_dag(config)
2.2 YAML/JSON ๊ธฐ๋ฐ ๋์ DAG¶
# dags/yaml_driven_dag.py
import yaml
from pathlib import Path
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# YAML ์ค์ ๋ก๋
config_path = Path(__file__).parent / 'configs' / 'dag_configs.yaml'
# configs/dag_configs.yaml ์์:
"""
dags:
- id: sales_etl
schedule: "0 6 * * *"
tasks:
- name: extract
type: python
function: extract_sales
- name: transform
type: python
function: transform_sales
- name: load
type: python
function: load_sales
"""
def load_config():
with open(config_path, 'r') as f:
return yaml.safe_load(f)
def create_task_callable(func_name: str):
"""ํจ์๋ช
์ผ๋ก callable ์์ฑ"""
def task_func(**kwargs):
print(f"Executing {func_name} for {kwargs['ds']}")
return task_func
def create_dag_from_yaml(dag_config: dict) -> DAG:
"""YAML ์ค์ ์ผ๋ก DAG ์์ฑ"""
dag = DAG(
dag_id=dag_config['id'],
schedule_interval=dag_config['schedule'],
start_date=datetime(2024, 1, 1),
catchup=False,
)
with dag:
tasks = {}
for task_config in dag_config['tasks']:
task = PythonOperator(
task_id=task_config['name'],
python_callable=create_task_callable(task_config['function']),
)
tasks[task_config['name']] = task
# ์์ฐจ ์์กด์ฑ ์ค์
task_list = list(tasks.values())
for i in range(len(task_list) - 1):
task_list[i] >> task_list[i + 1]
return dag
# DAG ์์ฑ ๋ฐ ๋ฑ๋ก
try:
config = load_config()
for dag_config in config.get('dags', []):
dag_id = dag_config['id']
globals()[dag_id] = create_dag_from_yaml(dag_config)
except Exception as e:
print(f"Error loading DAG config: {e}")
2.3 ๋์ Task ์์ฑ¶
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime
# ์ฒ๋ฆฌํ ํ
์ด๋ธ ๋ชฉ๋ก
TABLES = ['users', 'orders', 'products', 'reviews', 'inventory']
with DAG(
dag_id='dynamic_tasks_example',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False,
) as dag:
start = EmptyOperator(task_id='start')
end = EmptyOperator(task_id='end')
# ๋์ ์ผ๋ก Task ์์ฑ
for table in TABLES:
def process_table(table_name=table, **kwargs):
print(f"Processing table: {table_name}")
task = PythonOperator(
task_id=f'process_{table}',
python_callable=process_table,
op_kwargs={'table_name': table},
)
start >> task >> end
3. Sensor¶
3.1 ๋ด์ฅ Sensor¶
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.time_delta import TimeDeltaSensor
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.postgres.sensors.postgres import SqlSensor
from datetime import datetime, timedelta
with DAG('sensor_examples', start_date=datetime(2024, 1, 1), schedule_interval='@daily') as dag:
# 1. FileSensor - ํ์ผ ์กด์ฌ ํ์ธ
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/data/input/{{ ds }}/data.csv',
poke_interval=60, # ํ์ธ ์ฃผ๊ธฐ (์ด)
timeout=3600, # ํ์์์ (์ด)
mode='poke', # poke ๋๋ reschedule
)
# 2. ExternalTaskSensor - ๋ค๋ฅธ DAG์ Task ์๋ฃ ๋๊ธฐ
wait_for_upstream = ExternalTaskSensor(
task_id='wait_for_upstream',
external_dag_id='upstream_dag',
external_task_id='final_task',
execution_delta=timedelta(hours=0), # ๊ฐ์ execution_date
timeout=7200,
mode='reschedule', # ์์ปค ๋ฐํ ํ ์ฌ์ค์ผ์ค
)
# 3. HttpSensor - HTTP ์๋ํฌ์ธํธ ํ์ธ
wait_for_api = HttpSensor(
task_id='wait_for_api',
http_conn_id='my_api',
endpoint='/health',
request_params={},
response_check=lambda response: response.status_code == 200,
poke_interval=30,
timeout=600,
)
# 4. SqlSensor - SQL ์กฐ๊ฑด ํ์ธ
wait_for_data = SqlSensor(
task_id='wait_for_data',
conn_id='my_postgres',
sql="""
SELECT COUNT(*) > 0
FROM staging_table
WHERE date = '{{ ds }}'
""",
poke_interval=300,
timeout=3600,
)
# 5. TimeDeltaSensor - ์๊ฐ ๋๊ธฐ
wait_30_minutes = TimeDeltaSensor(
task_id='wait_30_minutes',
delta=timedelta(minutes=30),
)
3.2 ์ปค์คํ Sensor¶
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
import boto3
class S3KeySensorCustom(BaseSensorOperator):
"""S3 ํค ์กด์ฌ ํ์ธ ์ปค์คํ
Sensor"""
template_fields = ['bucket_key']
@apply_defaults
def __init__(
self,
bucket_name: str,
bucket_key: str,
aws_conn_id: str = 'aws_default',
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.bucket_name = bucket_name
self.bucket_key = bucket_key
self.aws_conn_id = aws_conn_id
def poke(self, context) -> bool:
"""์กฐ๊ฑด ํ์ธ (True ๋ฐํ ์ ์ฑ๊ณต)"""
self.log.info(f"Checking for s3://{self.bucket_name}/{self.bucket_key}")
# S3 ํด๋ผ์ด์ธํธ ์์ฑ
s3 = boto3.client('s3')
try:
s3.head_object(Bucket=self.bucket_name, Key=self.bucket_key)
self.log.info("File found!")
return True
except s3.exceptions.ClientError as e:
if e.response['Error']['Code'] == '404':
self.log.info("File not found, waiting...")
return False
raise
# ์ฌ์ฉ
wait_for_s3 = S3KeySensorCustom(
task_id='wait_for_s3_file',
bucket_name='my-bucket',
bucket_key='data/{{ ds }}/input.parquet',
poke_interval=60,
timeout=3600,
mode='reschedule',
)
3.3 Sensor ๋ชจ๋¶
# poke vs reschedule ๋ชจ๋ ๋น๊ต
sensor_modes = {
'poke': {
'description': '์์ปค ์ฌ๋กฏ์ ์ ์ ํ๊ณ ๋๊ธฐ',
'pros': '๋น ๋ฅธ ๋ฐ์ ์๊ฐ',
'cons': '์์ปค ๋ฆฌ์์ค ๋ญ๋น',
'use_case': '์งง์ ๋๊ธฐ ์๊ฐ ์์'
},
'reschedule': {
'description': '์์ปค ๋ฐํ ํ ์ฌ์ค์ผ์ค',
'pros': '์์ปค ๋ฆฌ์์ค ํจ์จ์ ์ฌ์ฉ',
'cons': '๋ค์ ๋๋ฆฐ ๋ฐ์ ์๊ฐ',
'use_case': '๊ธด ๋๊ธฐ ์๊ฐ ์์'
}
}
# ๊ถ์ฅ ์ค์
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/data/input.csv',
poke_interval=300, # 5๋ถ๋ง๋ค ํ์ธ
timeout=86400, # 24์๊ฐ ํ์์์
mode='reschedule', # ๊ธด ๋๊ธฐ์๋ reschedule
soft_fail=True, # ํ์์์ ์ ์คํต (์คํจ ๋์ )
)
4. Hook๊ณผ Connection¶
4.1 Connection ์ค์ ¶
# Airflow UI ๋๋ CLI๋ก Connection ์ค์
# Admin > Connections > Add
# CLI๋ก Connection ์ถ๊ฐ
"""
airflow connections add 'my_postgres' \
--conn-type 'postgres' \
--conn-host 'localhost' \
--conn-port '5432' \
--conn-login 'user' \
--conn-password 'password' \
--conn-schema 'mydb'
airflow connections add 'my_s3' \
--conn-type 'aws' \
--conn-extra '{"aws_access_key_id": "xxx", "aws_secret_access_key": "yyy", "region_name": "us-east-1"}'
"""
# ํ๊ฒฝ ๋ณ์๋ก Connection ์ค์
# AIRFLOW_CONN_MY_POSTGRES='postgresql://user:password@localhost:5432/mydb'
4.2 Hook ์ฌ์ฉ¶
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.hooks.http import HttpHook
def use_postgres_hook(**kwargs):
"""PostgreSQL Hook ์ฌ์ฉ"""
hook = PostgresHook(postgres_conn_id='my_postgres')
# SQL ์คํ
records = hook.get_records("SELECT * FROM users LIMIT 10")
# DataFrame์ผ๋ก ๋ฐํ
df = hook.get_pandas_df("SELECT * FROM users")
# ์ฝ์
hook.insert_rows(
table='users',
rows=[(1, 'John'), (2, 'Jane')],
target_fields=['id', 'name']
)
# ์ง์ ์ฐ๊ฒฐ ์ฌ์ฉ
conn = hook.get_conn()
cursor = conn.cursor()
cursor.execute("UPDATE users SET active = true")
conn.commit()
def use_s3_hook(**kwargs):
"""S3 Hook ์ฌ์ฉ"""
hook = S3Hook(aws_conn_id='my_s3')
# ํ์ผ ์
๋ก๋
hook.load_file(
filename='/tmp/data.csv',
key='data/output.csv',
bucket_name='my-bucket',
replace=True
)
# ํ์ผ ๋ค์ด๋ก๋
hook.download_file(
key='data/input.csv',
bucket_name='my-bucket',
local_path='/tmp/input.csv'
)
# ํ์ผ ๋ชฉ๋ก ์กฐํ
keys = hook.list_keys(
bucket_name='my-bucket',
prefix='data/',
delimiter='/'
)
def use_http_hook(**kwargs):
"""HTTP Hook ์ฌ์ฉ"""
hook = HttpHook(http_conn_id='my_api', method='GET')
response = hook.run(
endpoint='/api/data',
headers={'Authorization': 'Bearer token'},
data={'param': 'value'}
)
return response.json()
4.3 ์ปค์คํ Hook¶
from airflow.hooks.base import BaseHook
from typing import Any
import requests
class MyCustomHook(BaseHook):
"""์ปค์คํ
API Hook"""
conn_name_attr = 'my_custom_conn_id'
default_conn_name = 'my_custom_default'
conn_type = 'http'
hook_name = 'My Custom Hook'
def __init__(self, my_custom_conn_id: str = default_conn_name):
super().__init__()
self.my_custom_conn_id = my_custom_conn_id
self.base_url = None
self.api_key = None
def get_conn(self):
"""Connection ์ค์ ๋ก๋"""
conn = self.get_connection(self.my_custom_conn_id)
self.base_url = f"https://{conn.host}"
self.api_key = conn.password
return conn
def make_request(self, endpoint: str, method: str = 'GET', data: dict = None) -> Any:
"""API ์์ฒญ"""
self.get_conn()
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
url = f"{self.base_url}{endpoint}"
response = requests.request(
method=method,
url=url,
headers=headers,
json=data
)
response.raise_for_status()
return response.json()
# ์ฌ์ฉ
def call_custom_api(**kwargs):
hook = MyCustomHook(my_custom_conn_id='my_api')
result = hook.make_request('/users', method='GET')
return result
5. TaskGroup¶
5.1 TaskGroup ๊ธฐ๋ณธ ์ฌ์ฉ¶
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
with DAG('taskgroup_example', start_date=datetime(2024, 1, 1), schedule_interval='@daily') as dag:
start = EmptyOperator(task_id='start')
# TaskGroup์ผ๋ก ๊ด๋ จ Task ๊ทธ๋ฃนํ
with TaskGroup(group_id='extract_group') as extract_group:
extract_users = PythonOperator(
task_id='extract_users',
python_callable=lambda: print("Extracting users")
)
extract_orders = PythonOperator(
task_id='extract_orders',
python_callable=lambda: print("Extracting orders")
)
extract_products = PythonOperator(
task_id='extract_products',
python_callable=lambda: print("Extracting products")
)
with TaskGroup(group_id='transform_group') as transform_group:
transform_users = PythonOperator(
task_id='transform_users',
python_callable=lambda: print("Transforming users")
)
transform_orders = PythonOperator(
task_id='transform_orders',
python_callable=lambda: print("Transforming orders")
)
with TaskGroup(group_id='load_group') as load_group:
load_warehouse = PythonOperator(
task_id='load_warehouse',
python_callable=lambda: print("Loading to warehouse")
)
end = EmptyOperator(task_id='end')
# TaskGroup ๊ฐ ์์กด์ฑ
start >> extract_group >> transform_group >> load_group >> end
5.2 ์ค์ฒฉ TaskGroup¶
from airflow.utils.task_group import TaskGroup
with DAG('nested_taskgroup', ...) as dag:
with TaskGroup(group_id='data_processing') as data_processing:
with TaskGroup(group_id='source_a') as source_a:
extract_a = PythonOperator(task_id='extract', ...)
transform_a = PythonOperator(task_id='transform', ...)
extract_a >> transform_a
with TaskGroup(group_id='source_b') as source_b:
extract_b = PythonOperator(task_id='extract', ...)
transform_b = PythonOperator(task_id='transform', ...)
extract_b >> transform_b
# ๋ณ๋ ฌ ์คํ ํ ์กฐ์ธ
join = EmptyOperator(task_id='join')
[source_a, source_b] >> join
5.3 ๋์ TaskGroup¶
from airflow.utils.task_group import TaskGroup
SOURCES = ['mysql', 'postgres', 'mongodb']
with DAG('dynamic_taskgroup', ...) as dag:
start = EmptyOperator(task_id='start')
task_groups = []
for source in SOURCES:
with TaskGroup(group_id=f'process_{source}') as tg:
extract = PythonOperator(
task_id='extract',
python_callable=lambda s=source: print(f"Extract from {s}")
)
load = PythonOperator(
task_id='load',
python_callable=lambda s=source: print(f"Load {s}")
)
extract >> load
task_groups.append(tg)
end = EmptyOperator(task_id='end')
start >> task_groups >> end
6. ๋ถ๊ธฐ ์ฒ๋ฆฌ์ ์กฐ๊ฑด๋ถ ์คํ¶
6.1 BranchPythonOperator¶
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
def choose_branch(**kwargs):
"""์กฐ๊ฑด์ ๋ฐ๋ผ ๋ค์ Task ์ ํ"""
ti = kwargs['ti']
data_count = ti.xcom_pull(task_ids='count_data')
if data_count > 1000:
return 'process_large'
elif data_count > 0:
return 'process_small'
else:
return 'skip_processing'
with DAG('branch_example', ...) as dag:
count_data = PythonOperator(
task_id='count_data',
python_callable=lambda: 500, # ์์ ๋ฐํ๊ฐ
)
branch = BranchPythonOperator(
task_id='branch',
python_callable=choose_branch,
)
process_large = EmptyOperator(task_id='process_large')
process_small = EmptyOperator(task_id='process_small')
skip_processing = EmptyOperator(task_id='skip_processing')
# ๋ถ๊ธฐ ํ ํฉ๋ฅ
join = EmptyOperator(
task_id='join',
trigger_rule='none_failed_min_one_success' # ํ๋๋ผ๋ ์ฑ๊ณตํ๋ฉด ์คํ
)
count_data >> branch >> [process_large, process_small, skip_processing] >> join
6.2 ShortCircuitOperator¶
from airflow.operators.python import ShortCircuitOperator
def check_condition(**kwargs):
"""์กฐ๊ฑด ํ์ธ - False ๋ฐํ ์ ์ดํ Task ์คํต"""
ds = kwargs['ds']
# ์ฃผ๋ง์ด๋ฉด ์คํต
day_of_week = datetime.strptime(ds, '%Y-%m-%d').weekday()
return day_of_week < 5 # ํ์ผ๋ง True
with DAG('shortcircuit_example', ...) as dag:
check = ShortCircuitOperator(
task_id='check_weekday',
python_callable=check_condition,
)
# check๊ฐ False ๋ฐํ ์ ์๋ Task๋ค์ ์คํต๋จ
process = PythonOperator(task_id='process', ...)
load = PythonOperator(task_id='load', ...)
check >> process >> load
์ฐ์ต ๋ฌธ์ ¶
๋ฌธ์ 1: XCom ํ์ฉ¶
๋ ๊ฐ์ Task์์ ๊ฐ๊ฐ ์ซ์๋ฅผ ๋ฐํํ๊ณ , ์ธ ๋ฒ์งธ Task์์ ๋ ์ซ์์ ํฉ์ ๊ณ์ฐํ๋ DAG๋ฅผ ์์ฑํ์ธ์.
๋ฌธ์ 2: ๋์ DAG¶
ํ ์ด๋ธ ๋ชฉ๋ก(users, orders, products)์ ๊ธฐ๋ฐ์ผ๋ก ๊ฐ ํ ์ด๋ธ์ ๋ํ ETL Task๋ฅผ ๋์ ์ผ๋ก ์์ฑํ๋ DAG๋ฅผ ์์ฑํ์ธ์.
๋ฌธ์ 3: Sensor ์ฌ์ฉ¶
ํ์ผ์ด ์์ฑ๋ ๋๊น์ง ๋๊ธฐํ ํ ์ฒ๋ฆฌํ๋ DAG๋ฅผ ์์ฑํ์ธ์.
์์ฝ¶
| ๊ธฐ๋ฅ | ์ค๋ช |
|---|---|
| XCom | Task ๊ฐ ๋ฐ์ดํฐ ๊ณต์ ๋ฉ์ปค๋์ฆ |
| ๋์ DAG | ์ค์ ๊ธฐ๋ฐ์ผ๋ก DAG/Task ๋์ ์์ฑ |
| Sensor | ์กฐ๊ฑด ์ถฉ์กฑ๊น์ง ๋๊ธฐํ๋ Operator |
| Hook | ์ธ๋ถ ์์คํ ์ฐ๊ฒฐ ์ธํฐํ์ด์ค |
| TaskGroup | ๊ด๋ จ Task๋ฅผ ๊ทธ๋ฃนํํ์ฌ ์๊ฐํ |
| Branch | ์กฐ๊ฑด์ ๋ฐ๋ฅธ ๋ถ๊ธฐ ์ฒ๋ฆฌ |