Prefect 모던 오케스트레이션
Prefect 모던 오케스트레이션¶
개요¶
Prefect는 현대적인 워크플로우 오케스트레이션 도구로, Python 네이티브 방식으로 데이터 파이프라인을 구축합니다. Airflow와 비교하여 더 간단한 설정과 동적 워크플로우를 지원합니다.
1. Prefect 개요¶
1.1 Prefect vs Airflow¶
┌────────────────────────────────────────────────────────────────┐
│ Prefect vs Airflow 비교 │
├────────────────────────────────────────────────────────────────┤
│ │
│ Airflow: Prefect: │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ DAG (Static) │ │ Flow (Dynamic)│ │
│ │ │ │ │ │
│ │ - 정적 정의 │ │ - 동적 생성 │ │
│ │ - 파일 기반 │ │ - Python 코드 │ │
│ │ - Scheduler │ │ - 이벤트 기반 │ │
│ └──────────────┘ └──────────────┘ │
│ │
│ 실행 모델: 실행 모델: │
│ Scheduler → Worker Trigger → Work Pool → Worker │
│ │
└────────────────────────────────────────────────────────────────┘
| 특성 | Airflow | Prefect |
|---|---|---|
| 정의 방식 | DAG 파일 | Python 데코레이터 |
| 스케줄링 | Scheduler 프로세스 | 이벤트 기반, 서버리스 |
| 동적 워크플로우 | 제한적 | 네이티브 지원 |
| 로컬 실행 | 복잡한 설정 | 즉시 가능 |
| 상태 관리 | DB 필수 | 선택적 |
| 학습 곡선 | 가파름 | 완만함 |
1.2 Prefect 아키텍처¶
┌────────────────────────────────────────────────────────────────┐
│ Prefect Architecture │
├────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Prefect Cloud / Server │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ UI │ │ API │ │ Automations │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────┘ │
│ ↑ ↓ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Work Pools │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Process │ │ Docker │ │ K8s │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────┘ │
│ ↑ ↓ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Workers │ │
│ │ (Flow 실행 에이전트) │ │
│ └─────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────┘
2. 설치 및 시작하기¶
2.1 설치¶
# 기본 설치
pip install prefect
# 추가 통합 설치
pip install "prefect[aws]" # AWS 통합
pip install "prefect[gcp]" # GCP 통합
pip install "prefect[dask]" # Dask 통합
# 버전 확인
prefect version
2.2 Prefect Cloud 연결 (선택사항)¶
# Prefect Cloud 로그인
prefect cloud login
# 또는 API 키 사용
prefect cloud login --key YOUR_API_KEY
# Self-hosted 서버 연결
prefect config set PREFECT_API_URL="http://localhost:4200/api"
2.3 로컬 서버 실행¶
# Prefect 서버 시작 (UI 포함)
prefect server start
# UI 접속: http://localhost:4200
3. Flow와 Task 기본¶
3.1 기본 Flow 작성¶
from prefect import flow, task
from prefect.logging import get_run_logger
# Task 정의
@task
def extract_data(source: str) -> dict:
"""데이터 추출 Task"""
logger = get_run_logger()
logger.info(f"Extracting from {source}")
# 실제로는 DB, API 등에서 추출
data = {"source": source, "records": [1, 2, 3, 4, 5]}
return data
@task
def transform_data(data: dict) -> dict:
"""데이터 변환 Task"""
logger = get_run_logger()
logger.info(f"Transforming {len(data['records'])} records")
# 변환 로직
data["records"] = [x * 2 for x in data["records"]]
data["transformed"] = True
return data
@task
def load_data(data: dict, destination: str) -> bool:
"""데이터 적재 Task"""
logger = get_run_logger()
logger.info(f"Loading to {destination}")
# 실제로는 DB, 파일 등에 저장
print(f"Loaded data: {data}")
return True
# Flow 정의
@flow(name="ETL Pipeline")
def etl_pipeline(source: str = "database", destination: str = "warehouse"):
"""ETL 파이프라인 Flow"""
# Task 실행 (자동 의존성 관리)
raw_data = extract_data(source)
transformed = transform_data(raw_data)
result = load_data(transformed, destination)
return result
# 로컬 실행
if __name__ == "__main__":
etl_pipeline()
3.2 Task 옵션¶
from prefect import task
from datetime import timedelta
@task(
name="My Task",
description="Task 설명",
tags=["etl", "production"],
retries=3, # 재시도 횟수
retry_delay_seconds=60, # 재시도 대기 시간
timeout_seconds=3600, # 타임아웃
cache_key_fn=lambda: "static_key", # 캐시 키
cache_expiration=timedelta(hours=1), # 캐시 만료
log_prints=True, # print 문을 로그로 캡처
)
def my_task(param: str) -> str:
print(f"Processing: {param}")
return f"Result: {param}"
# 동적 재시도 (exponential backoff)
from prefect.tasks import exponential_backoff
@task(
retries=5,
retry_delay_seconds=exponential_backoff(backoff_factor=10),
)
def flaky_task():
"""불안정한 외부 API 호출"""
import random
if random.random() < 0.7:
raise Exception("Random failure")
return "Success"
3.3 Flow 옵션¶
from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner, SequentialTaskRunner
@flow(
name="My Flow",
description="Flow 설명",
version="1.0.0",
retries=2,
retry_delay_seconds=300,
timeout_seconds=7200,
task_runner=ConcurrentTaskRunner(), # 병렬 실행
log_prints=True,
persist_result=True, # 결과 저장
)
def my_flow():
pass
# 순차 실행
@flow(task_runner=SequentialTaskRunner())
def sequential_flow():
pass
4. 동적 워크플로우¶
4.1 동적 Task 생성¶
from prefect import flow, task
@task
def process_item(item: str) -> str:
return f"Processed: {item}"
@flow
def dynamic_tasks_flow(items: list[str]):
"""동적으로 Task 수 결정"""
results = []
for item in items:
result = process_item(item)
results.append(result)
return results
# 실행
dynamic_tasks_flow(["a", "b", "c", "d"])
# 병렬 실행 (.submit() 사용)
@flow
def parallel_tasks_flow(items: list[str]):
"""병렬로 Task 실행"""
futures = []
for item in items:
# .submit()은 Future 반환 (비동기)
future = process_item.submit(item)
futures.append(future)
# 결과 수집
results = [f.result() for f in futures]
return results
4.2 조건부 실행¶
from prefect import flow, task
@task
def check_condition(data: dict) -> bool:
return data.get("count", 0) > 100
@task
def process_large(data: dict):
print(f"Processing large dataset: {data['count']} records")
@task
def process_small(data: dict):
print(f"Processing small dataset: {data['count']} records")
@flow
def conditional_flow(data: dict):
"""조건에 따른 분기"""
is_large = check_condition(data)
if is_large:
process_large(data)
else:
process_small(data)
# 실행
conditional_flow({"count": 150}) # process_large 실행
conditional_flow({"count": 50}) # process_small 실행
4.3 서브플로우¶
from prefect import flow, task
@task
def extract(source: str) -> list:
return [1, 2, 3, 4, 5]
@task
def transform(data: list) -> list:
return [x * 2 for x in data]
@task
def load(data: list, target: str):
print(f"Loading {len(data)} records to {target}")
# 서브플로우 정의
@flow(name="ETL Subflow")
def etl_subflow(source: str, target: str):
"""재사용 가능한 ETL 서브플로우"""
data = extract(source)
transformed = transform(data)
load(transformed, target)
return len(transformed)
# 메인 플로우
@flow(name="Main Pipeline")
def main_pipeline():
"""여러 서브플로우 오케스트레이션"""
# 서브플로우 호출
count_a = etl_subflow("source_a", "target_a")
count_b = etl_subflow("source_b", "target_b")
count_c = etl_subflow("source_c", "target_c")
print(f"Total processed: {count_a + count_b + count_c}")
main_pipeline()
5. 배포 (Deployment)¶
5.1 Deployment 생성¶
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
@flow
def my_etl_flow(date: str = None):
"""일일 ETL 플로우"""
from datetime import datetime
date = date or datetime.now().strftime("%Y-%m-%d")
print(f"Running ETL for {date}")
# 방법 1: Python으로 Deployment 생성
deployment = Deployment.build_from_flow(
flow=my_etl_flow,
name="daily-etl",
version="1.0",
tags=["production", "etl"],
schedule=CronSchedule(cron="0 6 * * *"), # 매일 오전 6시
parameters={"date": None},
work_pool_name="default-agent-pool",
)
# Deployment 적용
deployment.apply()
5.2 CLI로 Deployment 생성¶
# prefect.yaml 생성
prefect init
# Deployment 빌드 및 적용
prefect deploy --name daily-etl
# prefect.yaml 예시
name: my-project
prefect-version: 2.14.0
deployments:
- name: daily-etl
entrypoint: flows/etl.py:my_etl_flow
work_pool:
name: default-agent-pool
schedule:
cron: "0 6 * * *"
parameters:
date: null
tags:
- production
- etl
5.3 Work Pool 및 Worker¶
# Work Pool 생성
prefect work-pool create my-pool --type process
# Worker 시작
prefect worker start --pool my-pool
# Docker 기반 Work Pool
prefect work-pool create docker-pool --type docker
# Kubernetes 기반 Work Pool
prefect work-pool create k8s-pool --type kubernetes
6. Airflow와의 비교 예제¶
6.1 Airflow 버전¶
# Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract(**kwargs):
ti = kwargs['ti']
data = [1, 2, 3, 4, 5]
ti.xcom_push(key='data', value=data)
def transform(**kwargs):
ti = kwargs['ti']
data = ti.xcom_pull(key='data', task_ids='extract')
result = [x * 2 for x in data]
ti.xcom_push(key='result', value=result)
def load(**kwargs):
ti = kwargs['ti']
result = ti.xcom_pull(key='result', task_ids='transform')
print(f"Loading: {result}")
with DAG(
'etl_airflow',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False,
) as dag:
t1 = PythonOperator(task_id='extract', python_callable=extract)
t2 = PythonOperator(task_id='transform', python_callable=transform)
t3 = PythonOperator(task_id='load', python_callable=load)
t1 >> t2 >> t3
6.2 Prefect 버전¶
# Prefect Flow - 훨씬 간단하고 직관적
from prefect import flow, task
@task
def extract() -> list:
return [1, 2, 3, 4, 5]
@task
def transform(data: list) -> list:
return [x * 2 for x in data]
@task
def load(data: list):
print(f"Loading: {data}")
@flow
def etl_prefect():
data = extract()
transformed = transform(data)
load(transformed)
# 로컬 실행
etl_prefect()
6.3 주요 차이점¶
"""
1. 데이터 전달:
- Airflow: XCom 사용 (명시적 push/pull)
- Prefect: 함수 반환값 직접 사용 (자연스러운 Python)
2. 의존성:
- Airflow: >> 연산자로 명시
- Prefect: 함수 호출 순서로 자동 추론
3. 스케줄링:
- Airflow: Scheduler 프로세스 필수
- Prefect: 선택적, 이벤트 기반 가능
4. 로컬 테스트:
- Airflow: 복잡한 설정 필요
- Prefect: 일반 Python 함수처럼 실행
5. 동적 워크플로우:
- Airflow: 제한적 지원
- Prefect: 네이티브 Python 제어문 사용
"""
7. 고급 기능¶
7.1 상태 핸들러¶
from prefect import flow, task
from prefect.states import State, Completed, Failed
def custom_state_handler(task, task_run, state: State):
"""Task 상태 변경 시 호출"""
if state.is_failed():
# 슬랙 알림 등
print(f"Task {task.name} failed!")
return state
@task(on_failure=[custom_state_handler])
def risky_task():
raise ValueError("Something went wrong")
# Flow 레벨 핸들러
@flow(on_failure=[lambda flow, flow_run, state: print("Flow failed!")])
def my_flow():
risky_task()
7.2 결과 저장소¶
from prefect import flow, task
from prefect.filesystems import S3, LocalFileSystem
from prefect.serializers import JSONSerializer
# 로컬 파일 시스템
@task(result_storage=LocalFileSystem(basepath="/tmp/prefect"))
def save_locally():
return {"data": [1, 2, 3]}
# S3 저장소
@task(
persist_result=True,
result_storage=S3(bucket_path="my-bucket/results"),
result_serializer=JSONSerializer(),
)
def save_to_s3():
return {"large": "data"}
7.3 비밀 관리¶
from prefect.blocks.system import Secret
# Block으로 비밀 저장 (UI 또는 CLI)
# prefect block register -m prefect.blocks.system
# 코드에서 사용
@task
def use_secret():
api_key = Secret.load("my-api-key").get()
# API 호출에 사용
return f"Using key: {api_key[:4]}..."
# 환경 변수 사용
import os
@task
def use_env_var():
return os.getenv("MY_SECRET")
연습 문제¶
문제 1: 기본 Flow 작성¶
3개의 Task(데이터 추출, 변환, 적재)로 구성된 ETL Flow를 작성하세요.
문제 2: 동적 Task¶
파일 목록을 입력받아 각 파일을 병렬로 처리하는 Flow를 작성하세요.
문제 3: 조건부 실행¶
데이터 크기에 따라 다른 처리 방식을 선택하는 Flow를 작성하세요.
요약¶
| 개념 | 설명 |
|---|---|
| Flow | 워크플로우 정의 (Airflow의 DAG) |
| Task | 개별 작업 단위 |
| Deployment | Flow의 배포 설정 |
| Work Pool | Worker 그룹 관리 |
| Worker | Flow 실행 에이전트 |