1"""
2Airflow ๊ธฐ๋ณธ DAG ์์
3
4์ด DAG๋ ๊ธฐ๋ณธ์ ์ธ Airflow ์ํฌํ๋ก์ฐ๋ฅผ ๋ณด์ฌ์ค๋๋ค:
5- PythonOperator๋ก Python ํจ์ ์คํ
6- BashOperator๋ก ์ ๋ช
๋ น ์คํ
7- Task ์์กด์ฑ ์ ์
8
9์คํ: airflow dags test simple_dag 2024-01-01
10"""
11
12from datetime import datetime, timedelta
13from airflow import DAG
14from airflow.operators.python import PythonOperator
15from airflow.operators.bash import BashOperator
16from airflow.operators.empty import EmptyOperator
17
18
19# ๊ธฐ๋ณธ ์ธ์ ์ค์
20default_args = {
21 'owner': 'data_team',
22 'depends_on_past': False,
23 'email': ['data-alerts@example.com'],
24 'email_on_failure': False,
25 'email_on_retry': False,
26 'retries': 1,
27 'retry_delay': timedelta(minutes=5),
28}
29
30
31# Python ํจ์ ์ ์
32def print_hello():
33 """์ธ์ฌ ๋ฉ์์ง ์ถ๋ ฅ"""
34 print("Hello from Airflow!")
35 return "hello_returned"
36
37
38def print_date(**context):
39 """์คํ ๋ ์ง ์ถ๋ ฅ"""
40 execution_date = context['ds']
41 print(f"Execution date: {execution_date}")
42 return execution_date
43
44
45def process_data(value: int, multiplier: int = 2, **context):
46 """๋ฐ์ดํฐ ์ฒ๋ฆฌ ์์"""
47 result = value * multiplier
48 print(f"Processing: {value} * {multiplier} = {result}")
49
50 # XCom์ผ๋ก ๊ฒฐ๊ณผ ์ ์ฅ
51 context['ti'].xcom_push(key='processed_value', value=result)
52 return result
53
54
55def summarize(**context):
56 """์ด์ Task ๊ฒฐ๊ณผ ์์ฝ"""
57 ti = context['ti']
58
59 # XCom์์ ๊ฐ ๊ฐ์ ธ์ค๊ธฐ
60 hello_result = ti.xcom_pull(task_ids='hello_task')
61 processed_value = ti.xcom_pull(task_ids='process_task', key='processed_value')
62
63 print(f"Summary:")
64 print(f" - Hello result: {hello_result}")
65 print(f" - Processed value: {processed_value}")
66 print(f" - Execution date: {context['ds']}")
67
68
69# DAG ์ ์
70with DAG(
71 dag_id='simple_dag',
72 default_args=default_args,
73 description='๊ฐ๋จํ Airflow DAG ์์ ',
74 schedule_interval='@daily', # ๋งค์ผ ์คํ
75 start_date=datetime(2024, 1, 1),
76 catchup=False, # ๊ณผ๊ฑฐ ์คํ ๊ฑด๋๋ฐ๊ธฐ
77 tags=['example', 'tutorial'],
78) as dag:
79
80 # Task ์ ์
81 start = EmptyOperator(task_id='start')
82
83 hello_task = PythonOperator(
84 task_id='hello_task',
85 python_callable=print_hello,
86 )
87
88 date_task = PythonOperator(
89 task_id='date_task',
90 python_callable=print_date,
91 )
92
93 bash_task = BashOperator(
94 task_id='bash_task',
95 bash_command='echo "Current time: $(date)" && sleep 2',
96 )
97
98 process_task = PythonOperator(
99 task_id='process_task',
100 python_callable=process_data,
101 op_kwargs={'value': 10, 'multiplier': 5},
102 )
103
104 summary_task = PythonOperator(
105 task_id='summary_task',
106 python_callable=summarize,
107 )
108
109 end = EmptyOperator(task_id='end')
110
111 # Task ์์กด์ฑ ์ ์
112 # โโ hello_task โโ
113 # start โโค โโ process_task โ summary_task โ end
114 # โโ date_task โโโ
115 # โโ bash_task โโโ
116
117 start >> [hello_task, date_task]
118 hello_task >> process_task
119 date_task >> [bash_task, process_task]
120 [bash_task, process_task] >> summary_task >> end
121
122
123if __name__ == "__main__":
124 # ๋ก์ปฌ ํ
์คํธ
125 dag.test()