etl_dag.py

Download
python 295 lines 8.8 KB
  1"""
  2Airflow ETL ํŒŒ์ดํ”„๋ผ์ธ DAG ์˜ˆ์ œ
  3
  4์ด DAG๋Š” ์‹ค์ œ ETL ํŒŒ์ดํ”„๋ผ์ธ์˜ ๊ตฌ์กฐ๋ฅผ ๋ณด์—ฌ์ค๋‹ˆ๋‹ค:
  5- Extract: ๋ฐ์ดํ„ฐ ์†Œ์Šค์—์„œ ์ถ”์ถœ
  6- Transform: ๋ฐ์ดํ„ฐ ์ •์ œ ๋ฐ ๋ณ€ํ™˜
  7- Load: ๋ชฉ์ ์ง€์— ์ ์žฌ
  8- Quality Check: ๋ฐ์ดํ„ฐ ํ’ˆ์งˆ ๊ฒ€์ฆ
  9
 10์‹คํ–‰: airflow dags test etl_pipeline 2024-01-01
 11"""
 12
 13from datetime import datetime, timedelta
 14from airflow import DAG
 15from airflow.operators.python import PythonOperator, BranchPythonOperator
 16from airflow.operators.empty import EmptyOperator
 17from airflow.utils.task_group import TaskGroup
 18from airflow.utils.trigger_rule import TriggerRule
 19import json
 20import os
 21
 22
 23default_args = {
 24    'owner': 'data_team',
 25    'depends_on_past': False,
 26    'retries': 2,
 27    'retry_delay': timedelta(minutes=5),
 28}
 29
 30
 31# ============================================
 32# Extract ํ•จ์ˆ˜๋“ค
 33# ============================================
 34def extract_orders(**context):
 35    """์ฃผ๋ฌธ ๋ฐ์ดํ„ฐ ์ถ”์ถœ"""
 36    ds = context['ds']
 37    print(f"Extracting orders for {ds}")
 38
 39    # ์‹œ๋ฎฌ๋ ˆ์ด์…˜: ์‹ค์ œ๋กœ๋Š” DB ์ฟผ๋ฆฌ
 40    orders = [
 41        {'order_id': 1, 'customer_id': 101, 'amount': 150.00, 'status': 'completed'},
 42        {'order_id': 2, 'customer_id': 102, 'amount': 250.50, 'status': 'completed'},
 43        {'order_id': 3, 'customer_id': 101, 'amount': 75.25, 'status': 'pending'},
 44        {'order_id': 4, 'customer_id': 103, 'amount': 320.00, 'status': 'completed'},
 45        {'order_id': 5, 'customer_id': 102, 'amount': 99.99, 'status': 'cancelled'},
 46    ]
 47
 48    # XCom์œผ๋กœ ๋ฐ์ดํ„ฐ ์ „๋‹ฌ
 49    context['ti'].xcom_push(key='raw_orders', value=orders)
 50    print(f"Extracted {len(orders)} orders")
 51    return len(orders)
 52
 53
 54def extract_customers(**context):
 55    """๊ณ ๊ฐ ๋ฐ์ดํ„ฐ ์ถ”์ถœ"""
 56    customers = [
 57        {'customer_id': 101, 'name': 'Alice', 'segment': 'Gold'},
 58        {'customer_id': 102, 'name': 'Bob', 'segment': 'Silver'},
 59        {'customer_id': 103, 'name': 'Charlie', 'segment': 'Bronze'},
 60    ]
 61
 62    context['ti'].xcom_push(key='raw_customers', value=customers)
 63    print(f"Extracted {len(customers)} customers")
 64    return len(customers)
 65
 66
 67# ============================================
 68# Transform ํ•จ์ˆ˜๋“ค
 69# ============================================
 70def transform_orders(**context):
 71    """์ฃผ๋ฌธ ๋ฐ์ดํ„ฐ ๋ณ€ํ™˜"""
 72    ti = context['ti']
 73    orders = ti.xcom_pull(task_ids='extract.extract_orders', key='raw_orders')
 74
 75    # ๋ณ€ํ™˜ ๋กœ์ง
 76    transformed = []
 77    for order in orders:
 78        # completed ์ฃผ๋ฌธ๋งŒ ํฌํ•จ
 79        if order['status'] == 'completed':
 80            transformed.append({
 81                'order_id': order['order_id'],
 82                'customer_id': order['customer_id'],
 83                'amount': order['amount'],
 84                'order_date': context['ds'],
 85            })
 86
 87    ti.xcom_push(key='transformed_orders', value=transformed)
 88    print(f"Transformed {len(transformed)} orders (from {len(orders)})")
 89    return len(transformed)
 90
 91
 92def enrich_orders(**context):
 93    """์ฃผ๋ฌธ ๋ฐ์ดํ„ฐ์— ๊ณ ๊ฐ ์ •๋ณด ์ถ”๊ฐ€"""
 94    ti = context['ti']
 95    orders = ti.xcom_pull(task_ids='transform.transform_orders', key='transformed_orders')
 96    customers = ti.xcom_pull(task_ids='extract.extract_customers', key='raw_customers')
 97
 98    # ๊ณ ๊ฐ ์ •๋ณด ๋งคํ•‘
 99    customer_map = {c['customer_id']: c for c in customers}
100
101    enriched = []
102    for order in orders:
103        customer = customer_map.get(order['customer_id'], {})
104        enriched.append({
105            **order,
106            'customer_name': customer.get('name', 'Unknown'),
107            'customer_segment': customer.get('segment', 'Unknown'),
108        })
109
110    ti.xcom_push(key='enriched_orders', value=enriched)
111    print(f"Enriched {len(enriched)} orders")
112    return enriched
113
114
115# ============================================
116# Load ํ•จ์ˆ˜๋“ค
117# ============================================
118def load_to_warehouse(**context):
119    """๋ฐ์ดํ„ฐ ์›จ์–ดํ•˜์šฐ์Šค์— ์ ์žฌ"""
120    ti = context['ti']
121    enriched_orders = ti.xcom_pull(task_ids='transform.enrich_orders', key='enriched_orders')
122
123    # ์‹œ๋ฎฌ๋ ˆ์ด์…˜: ์‹ค์ œ๋กœ๋Š” DB INSERT
124    print(f"Loading {len(enriched_orders)} records to warehouse")
125    for order in enriched_orders:
126        print(f"  INSERT: {order}")
127
128    return len(enriched_orders)
129
130
131# ============================================
132# Quality Check ํ•จ์ˆ˜๋“ค
133# ============================================
134def check_row_count(**context):
135    """ํ–‰ ์ˆ˜ ๊ฒ€์ฆ"""
136    ti = context['ti']
137    enriched_orders = ti.xcom_pull(task_ids='transform.enrich_orders', key='enriched_orders')
138
139    row_count = len(enriched_orders)
140    print(f"Row count check: {row_count}")
141
142    if row_count == 0:
143        raise ValueError("No data to load!")
144
145    ti.xcom_push(key='row_count', value=row_count)
146    return row_count
147
148
149def check_data_quality(**context):
150    """๋ฐ์ดํ„ฐ ํ’ˆ์งˆ ๊ฒ€์ฆ"""
151    ti = context['ti']
152    enriched_orders = ti.xcom_pull(task_ids='transform.enrich_orders', key='enriched_orders')
153
154    errors = []
155
156    for order in enriched_orders:
157        # NULL ์ฒดํฌ
158        if order.get('order_id') is None:
159            errors.append(f"Missing order_id")
160
161        # ๊ฐ’ ๋ฒ”์œ„ ์ฒดํฌ
162        if order.get('amount', 0) < 0:
163            errors.append(f"Negative amount: {order['order_id']}")
164
165    if errors:
166        print(f"Quality issues found: {errors}")
167        ti.xcom_push(key='quality_issues', value=errors)
168        return 'has_issues'
169    else:
170        print("Quality check passed")
171        return 'no_issues'
172
173
174def decide_next_step(**context):
175    """ํ’ˆ์งˆ ๊ฒฐ๊ณผ์— ๋”ฐ๋ฅธ ๋ถ„๊ธฐ"""
176    ti = context['ti']
177    quality_result = ti.xcom_pull(task_ids='quality.check_data_quality')
178
179    if quality_result == 'has_issues':
180        return 'quality.handle_issues'
181    else:
182        return 'load'
183
184
185def handle_quality_issues(**context):
186    """ํ’ˆ์งˆ ์ด์Šˆ ์ฒ˜๋ฆฌ"""
187    ti = context['ti']
188    issues = ti.xcom_pull(task_ids='quality.check_data_quality', key='quality_issues')
189    print(f"Handling quality issues: {issues}")
190    # ์‹ค์ œ๋กœ๋Š” ์•Œ๋ฆผ ๋ฐœ์†ก, ๋กœ๊ทธ ๊ธฐ๋ก ๋“ฑ
191
192
193# ============================================
194# Notification ํ•จ์ˆ˜
195# ============================================
196def send_success_notification(**context):
197    """์„ฑ๊ณต ์•Œ๋ฆผ"""
198    ti = context['ti']
199    row_count = ti.xcom_pull(task_ids='quality.check_row_count', key='row_count')
200
201    message = f"""
202    ETL Pipeline Completed Successfully!
203    Date: {context['ds']}
204    Records Loaded: {row_count}
205    """
206    print(message)
207    # ์‹ค์ œ๋กœ๋Š” Slack, Email ๋“ฑ์œผ๋กœ ์•Œ๋ฆผ
208
209
210# DAG ์ •์˜
211with DAG(
212    dag_id='etl_pipeline',
213    default_args=default_args,
214    description='ETL ํŒŒ์ดํ”„๋ผ์ธ ์˜ˆ์ œ',
215    schedule_interval='0 6 * * *',  # ๋งค์ผ ์˜ค์ „ 6์‹œ
216    start_date=datetime(2024, 1, 1),
217    catchup=False,
218    tags=['etl', 'production'],
219) as dag:
220
221    start = EmptyOperator(task_id='start')
222
223    # Extract TaskGroup
224    with TaskGroup(group_id='extract') as extract_group:
225        extract_orders_task = PythonOperator(
226            task_id='extract_orders',
227            python_callable=extract_orders,
228        )
229        extract_customers_task = PythonOperator(
230            task_id='extract_customers',
231            python_callable=extract_customers,
232        )
233
234    # Transform TaskGroup
235    with TaskGroup(group_id='transform') as transform_group:
236        transform_task = PythonOperator(
237            task_id='transform_orders',
238            python_callable=transform_orders,
239        )
240        enrich_task = PythonOperator(
241            task_id='enrich_orders',
242            python_callable=enrich_orders,
243        )
244        transform_task >> enrich_task
245
246    # Quality TaskGroup
247    with TaskGroup(group_id='quality') as quality_group:
248        row_count_task = PythonOperator(
249            task_id='check_row_count',
250            python_callable=check_row_count,
251        )
252        quality_task = PythonOperator(
253            task_id='check_data_quality',
254            python_callable=check_data_quality,
255        )
256        handle_issues_task = PythonOperator(
257            task_id='handle_issues',
258            python_callable=handle_quality_issues,
259            trigger_rule=TriggerRule.NONE_FAILED,
260        )
261        [row_count_task, quality_task]
262
263    # Branch
264    branch_task = BranchPythonOperator(
265        task_id='branch_on_quality',
266        python_callable=decide_next_step,
267    )
268
269    # Load
270    load_task = PythonOperator(
271        task_id='load',
272        python_callable=load_to_warehouse,
273    )
274
275    # Notify
276    notify_task = PythonOperator(
277        task_id='notify',
278        python_callable=send_success_notification,
279        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
280    )
281
282    end = EmptyOperator(
283        task_id='end',
284        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
285    )
286
287    # Task ์˜์กด์„ฑ
288    start >> extract_group >> transform_group >> quality_group >> branch_task
289    branch_task >> [load_task, handle_issues_task]
290    [load_task, handle_issues_task] >> notify_task >> end
291
292
293if __name__ == "__main__":
294    dag.test()