simple_dag.py

Download
python 126 lines 3.2 KB
  1"""
  2Airflow ๊ธฐ๋ณธ DAG ์˜ˆ์ œ
  3
  4์ด DAG๋Š” ๊ธฐ๋ณธ์ ์ธ Airflow ์›Œํฌํ”Œ๋กœ์šฐ๋ฅผ ๋ณด์—ฌ์ค๋‹ˆ๋‹ค:
  5- PythonOperator๋กœ Python ํ•จ์ˆ˜ ์‹คํ–‰
  6- BashOperator๋กœ ์‰˜ ๋ช…๋ น ์‹คํ–‰
  7- Task ์˜์กด์„ฑ ์ •์˜
  8
  9์‹คํ–‰: airflow dags test simple_dag 2024-01-01
 10"""
 11
 12from datetime import datetime, timedelta
 13from airflow import DAG
 14from airflow.operators.python import PythonOperator
 15from airflow.operators.bash import BashOperator
 16from airflow.operators.empty import EmptyOperator
 17
 18
 19# ๊ธฐ๋ณธ ์ธ์ž ์„ค์ •
 20default_args = {
 21    'owner': 'data_team',
 22    'depends_on_past': False,
 23    'email': ['data-alerts@example.com'],
 24    'email_on_failure': False,
 25    'email_on_retry': False,
 26    'retries': 1,
 27    'retry_delay': timedelta(minutes=5),
 28}
 29
 30
 31# Python ํ•จ์ˆ˜ ์ •์˜
 32def print_hello():
 33    """์ธ์‚ฌ ๋ฉ”์‹œ์ง€ ์ถœ๋ ฅ"""
 34    print("Hello from Airflow!")
 35    return "hello_returned"
 36
 37
 38def print_date(**context):
 39    """์‹คํ–‰ ๋‚ ์งœ ์ถœ๋ ฅ"""
 40    execution_date = context['ds']
 41    print(f"Execution date: {execution_date}")
 42    return execution_date
 43
 44
 45def process_data(value: int, multiplier: int = 2, **context):
 46    """๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์˜ˆ์‹œ"""
 47    result = value * multiplier
 48    print(f"Processing: {value} * {multiplier} = {result}")
 49
 50    # XCom์œผ๋กœ ๊ฒฐ๊ณผ ์ €์žฅ
 51    context['ti'].xcom_push(key='processed_value', value=result)
 52    return result
 53
 54
 55def summarize(**context):
 56    """์ด์ „ Task ๊ฒฐ๊ณผ ์š”์•ฝ"""
 57    ti = context['ti']
 58
 59    # XCom์—์„œ ๊ฐ’ ๊ฐ€์ ธ์˜ค๊ธฐ
 60    hello_result = ti.xcom_pull(task_ids='hello_task')
 61    processed_value = ti.xcom_pull(task_ids='process_task', key='processed_value')
 62
 63    print(f"Summary:")
 64    print(f"  - Hello result: {hello_result}")
 65    print(f"  - Processed value: {processed_value}")
 66    print(f"  - Execution date: {context['ds']}")
 67
 68
 69# DAG ์ •์˜
 70with DAG(
 71    dag_id='simple_dag',
 72    default_args=default_args,
 73    description='๊ฐ„๋‹จํ•œ Airflow DAG ์˜ˆ์ œ',
 74    schedule_interval='@daily',  # ๋งค์ผ ์‹คํ–‰
 75    start_date=datetime(2024, 1, 1),
 76    catchup=False,  # ๊ณผ๊ฑฐ ์‹คํ–‰ ๊ฑด๋„ˆ๋›ฐ๊ธฐ
 77    tags=['example', 'tutorial'],
 78) as dag:
 79
 80    # Task ์ •์˜
 81    start = EmptyOperator(task_id='start')
 82
 83    hello_task = PythonOperator(
 84        task_id='hello_task',
 85        python_callable=print_hello,
 86    )
 87
 88    date_task = PythonOperator(
 89        task_id='date_task',
 90        python_callable=print_date,
 91    )
 92
 93    bash_task = BashOperator(
 94        task_id='bash_task',
 95        bash_command='echo "Current time: $(date)" && sleep 2',
 96    )
 97
 98    process_task = PythonOperator(
 99        task_id='process_task',
100        python_callable=process_data,
101        op_kwargs={'value': 10, 'multiplier': 5},
102    )
103
104    summary_task = PythonOperator(
105        task_id='summary_task',
106        python_callable=summarize,
107    )
108
109    end = EmptyOperator(task_id='end')
110
111    # Task ์˜์กด์„ฑ ์ •์˜
112    #     โ”Œโ”€ hello_task โ”€โ”
113    # start โ”€โ”ค             โ”œโ”€ process_task โ”€ summary_task โ”€ end
114    #     โ””โ”€ date_task โ”€โ”€โ”˜
115    #             โ””โ”€ bash_task โ”€โ”€โ”˜
116
117    start >> [hello_task, date_task]
118    hello_task >> process_task
119    date_task >> [bash_task, process_task]
120    [bash_task, process_task] >> summary_task >> end
121
122
123if __name__ == "__main__":
124    # ๋กœ์ปฌ ํ…Œ์ŠคํŠธ
125    dag.test()