1"""
2Airflow ETL ํ์ดํ๋ผ์ธ DAG ์์
3
4์ด DAG๋ ์ค์ ETL ํ์ดํ๋ผ์ธ์ ๊ตฌ์กฐ๋ฅผ ๋ณด์ฌ์ค๋๋ค:
5- Extract: ๋ฐ์ดํฐ ์์ค์์ ์ถ์ถ
6- Transform: ๋ฐ์ดํฐ ์ ์ ๋ฐ ๋ณํ
7- Load: ๋ชฉ์ ์ง์ ์ ์ฌ
8- Quality Check: ๋ฐ์ดํฐ ํ์ง ๊ฒ์ฆ
9
10์คํ: airflow dags test etl_pipeline 2024-01-01
11"""
12
13from datetime import datetime, timedelta
14from airflow import DAG
15from airflow.operators.python import PythonOperator, BranchPythonOperator
16from airflow.operators.empty import EmptyOperator
17from airflow.utils.task_group import TaskGroup
18from airflow.utils.trigger_rule import TriggerRule
19import json
20import os
21
22
23default_args = {
24 'owner': 'data_team',
25 'depends_on_past': False,
26 'retries': 2,
27 'retry_delay': timedelta(minutes=5),
28}
29
30
31# ============================================
32# Extract ํจ์๋ค
33# ============================================
34def extract_orders(**context):
35 """์ฃผ๋ฌธ ๋ฐ์ดํฐ ์ถ์ถ"""
36 ds = context['ds']
37 print(f"Extracting orders for {ds}")
38
39 # ์๋ฎฌ๋ ์ด์
: ์ค์ ๋ก๋ DB ์ฟผ๋ฆฌ
40 orders = [
41 {'order_id': 1, 'customer_id': 101, 'amount': 150.00, 'status': 'completed'},
42 {'order_id': 2, 'customer_id': 102, 'amount': 250.50, 'status': 'completed'},
43 {'order_id': 3, 'customer_id': 101, 'amount': 75.25, 'status': 'pending'},
44 {'order_id': 4, 'customer_id': 103, 'amount': 320.00, 'status': 'completed'},
45 {'order_id': 5, 'customer_id': 102, 'amount': 99.99, 'status': 'cancelled'},
46 ]
47
48 # XCom์ผ๋ก ๋ฐ์ดํฐ ์ ๋ฌ
49 context['ti'].xcom_push(key='raw_orders', value=orders)
50 print(f"Extracted {len(orders)} orders")
51 return len(orders)
52
53
54def extract_customers(**context):
55 """๊ณ ๊ฐ ๋ฐ์ดํฐ ์ถ์ถ"""
56 customers = [
57 {'customer_id': 101, 'name': 'Alice', 'segment': 'Gold'},
58 {'customer_id': 102, 'name': 'Bob', 'segment': 'Silver'},
59 {'customer_id': 103, 'name': 'Charlie', 'segment': 'Bronze'},
60 ]
61
62 context['ti'].xcom_push(key='raw_customers', value=customers)
63 print(f"Extracted {len(customers)} customers")
64 return len(customers)
65
66
67# ============================================
68# Transform ํจ์๋ค
69# ============================================
70def transform_orders(**context):
71 """์ฃผ๋ฌธ ๋ฐ์ดํฐ ๋ณํ"""
72 ti = context['ti']
73 orders = ti.xcom_pull(task_ids='extract.extract_orders', key='raw_orders')
74
75 # ๋ณํ ๋ก์ง
76 transformed = []
77 for order in orders:
78 # completed ์ฃผ๋ฌธ๋ง ํฌํจ
79 if order['status'] == 'completed':
80 transformed.append({
81 'order_id': order['order_id'],
82 'customer_id': order['customer_id'],
83 'amount': order['amount'],
84 'order_date': context['ds'],
85 })
86
87 ti.xcom_push(key='transformed_orders', value=transformed)
88 print(f"Transformed {len(transformed)} orders (from {len(orders)})")
89 return len(transformed)
90
91
92def enrich_orders(**context):
93 """์ฃผ๋ฌธ ๋ฐ์ดํฐ์ ๊ณ ๊ฐ ์ ๋ณด ์ถ๊ฐ"""
94 ti = context['ti']
95 orders = ti.xcom_pull(task_ids='transform.transform_orders', key='transformed_orders')
96 customers = ti.xcom_pull(task_ids='extract.extract_customers', key='raw_customers')
97
98 # ๊ณ ๊ฐ ์ ๋ณด ๋งคํ
99 customer_map = {c['customer_id']: c for c in customers}
100
101 enriched = []
102 for order in orders:
103 customer = customer_map.get(order['customer_id'], {})
104 enriched.append({
105 **order,
106 'customer_name': customer.get('name', 'Unknown'),
107 'customer_segment': customer.get('segment', 'Unknown'),
108 })
109
110 ti.xcom_push(key='enriched_orders', value=enriched)
111 print(f"Enriched {len(enriched)} orders")
112 return enriched
113
114
115# ============================================
116# Load ํจ์๋ค
117# ============================================
118def load_to_warehouse(**context):
119 """๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค์ ์ ์ฌ"""
120 ti = context['ti']
121 enriched_orders = ti.xcom_pull(task_ids='transform.enrich_orders', key='enriched_orders')
122
123 # ์๋ฎฌ๋ ์ด์
: ์ค์ ๋ก๋ DB INSERT
124 print(f"Loading {len(enriched_orders)} records to warehouse")
125 for order in enriched_orders:
126 print(f" INSERT: {order}")
127
128 return len(enriched_orders)
129
130
131# ============================================
132# Quality Check ํจ์๋ค
133# ============================================
134def check_row_count(**context):
135 """ํ ์ ๊ฒ์ฆ"""
136 ti = context['ti']
137 enriched_orders = ti.xcom_pull(task_ids='transform.enrich_orders', key='enriched_orders')
138
139 row_count = len(enriched_orders)
140 print(f"Row count check: {row_count}")
141
142 if row_count == 0:
143 raise ValueError("No data to load!")
144
145 ti.xcom_push(key='row_count', value=row_count)
146 return row_count
147
148
149def check_data_quality(**context):
150 """๋ฐ์ดํฐ ํ์ง ๊ฒ์ฆ"""
151 ti = context['ti']
152 enriched_orders = ti.xcom_pull(task_ids='transform.enrich_orders', key='enriched_orders')
153
154 errors = []
155
156 for order in enriched_orders:
157 # NULL ์ฒดํฌ
158 if order.get('order_id') is None:
159 errors.append(f"Missing order_id")
160
161 # ๊ฐ ๋ฒ์ ์ฒดํฌ
162 if order.get('amount', 0) < 0:
163 errors.append(f"Negative amount: {order['order_id']}")
164
165 if errors:
166 print(f"Quality issues found: {errors}")
167 ti.xcom_push(key='quality_issues', value=errors)
168 return 'has_issues'
169 else:
170 print("Quality check passed")
171 return 'no_issues'
172
173
174def decide_next_step(**context):
175 """ํ์ง ๊ฒฐ๊ณผ์ ๋ฐ๋ฅธ ๋ถ๊ธฐ"""
176 ti = context['ti']
177 quality_result = ti.xcom_pull(task_ids='quality.check_data_quality')
178
179 if quality_result == 'has_issues':
180 return 'quality.handle_issues'
181 else:
182 return 'load'
183
184
185def handle_quality_issues(**context):
186 """ํ์ง ์ด์ ์ฒ๋ฆฌ"""
187 ti = context['ti']
188 issues = ti.xcom_pull(task_ids='quality.check_data_quality', key='quality_issues')
189 print(f"Handling quality issues: {issues}")
190 # ์ค์ ๋ก๋ ์๋ฆผ ๋ฐ์ก, ๋ก๊ทธ ๊ธฐ๋ก ๋ฑ
191
192
193# ============================================
194# Notification ํจ์
195# ============================================
196def send_success_notification(**context):
197 """์ฑ๊ณต ์๋ฆผ"""
198 ti = context['ti']
199 row_count = ti.xcom_pull(task_ids='quality.check_row_count', key='row_count')
200
201 message = f"""
202 ETL Pipeline Completed Successfully!
203 Date: {context['ds']}
204 Records Loaded: {row_count}
205 """
206 print(message)
207 # ์ค์ ๋ก๋ Slack, Email ๋ฑ์ผ๋ก ์๋ฆผ
208
209
210# DAG ์ ์
211with DAG(
212 dag_id='etl_pipeline',
213 default_args=default_args,
214 description='ETL ํ์ดํ๋ผ์ธ ์์ ',
215 schedule_interval='0 6 * * *', # ๋งค์ผ ์ค์ 6์
216 start_date=datetime(2024, 1, 1),
217 catchup=False,
218 tags=['etl', 'production'],
219) as dag:
220
221 start = EmptyOperator(task_id='start')
222
223 # Extract TaskGroup
224 with TaskGroup(group_id='extract') as extract_group:
225 extract_orders_task = PythonOperator(
226 task_id='extract_orders',
227 python_callable=extract_orders,
228 )
229 extract_customers_task = PythonOperator(
230 task_id='extract_customers',
231 python_callable=extract_customers,
232 )
233
234 # Transform TaskGroup
235 with TaskGroup(group_id='transform') as transform_group:
236 transform_task = PythonOperator(
237 task_id='transform_orders',
238 python_callable=transform_orders,
239 )
240 enrich_task = PythonOperator(
241 task_id='enrich_orders',
242 python_callable=enrich_orders,
243 )
244 transform_task >> enrich_task
245
246 # Quality TaskGroup
247 with TaskGroup(group_id='quality') as quality_group:
248 row_count_task = PythonOperator(
249 task_id='check_row_count',
250 python_callable=check_row_count,
251 )
252 quality_task = PythonOperator(
253 task_id='check_data_quality',
254 python_callable=check_data_quality,
255 )
256 handle_issues_task = PythonOperator(
257 task_id='handle_issues',
258 python_callable=handle_quality_issues,
259 trigger_rule=TriggerRule.NONE_FAILED,
260 )
261 [row_count_task, quality_task]
262
263 # Branch
264 branch_task = BranchPythonOperator(
265 task_id='branch_on_quality',
266 python_callable=decide_next_step,
267 )
268
269 # Load
270 load_task = PythonOperator(
271 task_id='load',
272 python_callable=load_to_warehouse,
273 )
274
275 # Notify
276 notify_task = PythonOperator(
277 task_id='notify',
278 python_callable=send_success_notification,
279 trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
280 )
281
282 end = EmptyOperator(
283 task_id='end',
284 trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
285 )
286
287 # Task ์์กด์ฑ
288 start >> extract_group >> transform_group >> quality_group >> branch_task
289 branch_task >> [load_task, handle_issues_task]
290 [load_task, handle_issues_task] >> notify_task >> end
291
292
293if __name__ == "__main__":
294 dag.test()