ML 프로젝트 라이프사이클
ML 프로젝트 라이프사이클¶
1. ML 프로젝트 단계 개요¶
머신러닝 프로젝트는 단순히 모델을 학습시키는 것을 넘어, 데이터 수집부터 모니터링까지 전체 생명주기를 관리해야 합니다.
┌─────────────────────────────────────────────────────────────────────┐
│ ML 프로젝트 라이프사이클 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 문제정의 │───▶│ 데이터 │───▶│ 피처 │───▶│ 모델 │ │
│ │ │ │ 수집/준비 │ │ 엔지니어링│ │ 학습 │ │
│ └──────────┘ └──────────┘ └──────────┘ └────┬─────┘ │
│ │ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ 모니터링 │◀───│ 배포 │◀───│ 검증 │◀────────┘ │
│ │ │ │ │ │ │ │
│ └────┬─────┘ └──────────┘ └──────────┘ │
│ │ │
│ └──────────────── 재학습 ─────────────────────────────▶ │
│ │
└─────────────────────────────────────────────────────────────────────┘
2. 문제 정의 및 범위 설정¶
2.1 비즈니스 목표 정의¶
"""
ML 프로젝트 문제 정의 템플릿
"""
project_definition = {
# 비즈니스 목표
"business_objective": "고객 이탈률 30% 감소",
# ML 문제 정의
"ml_problem": {
"type": "binary_classification",
"target": "is_churned",
"success_metric": "precision_at_recall_80",
"baseline": 0.65
},
# 제약사항
"constraints": {
"latency": "< 100ms",
"throughput": "1000 req/s",
"model_size": "< 500MB",
"interpretability": "high" # 규제 요구사항
},
# 데이터 요구사항
"data_requirements": {
"historical_period": "2 years",
"minimum_samples": 100000,
"features": ["usage_patterns", "demographics", "support_tickets"]
}
}
2.2 성공 기준 정의¶
"""
모델 성능 기준 정의
"""
success_criteria = {
# 오프라인 메트릭 (모델 품질)
"offline_metrics": {
"accuracy": {"min": 0.85, "target": 0.90},
"precision": {"min": 0.80, "target": 0.85},
"recall": {"min": 0.75, "target": 0.80},
"auc_roc": {"min": 0.85, "target": 0.90}
},
# 온라인 메트릭 (비즈니스 영향)
"online_metrics": {
"churn_rate_reduction": {"target": "30%"},
"false_positive_cost": {"max": "$10K/month"}
},
# 시스템 메트릭
"system_metrics": {
"p99_latency": {"max": "100ms"},
"availability": {"min": "99.9%"},
"throughput": {"min": "1000 req/s"}
}
}
3. 데이터 수집 및 준비¶
3.1 데이터 파이프라인¶
"""
데이터 수집 파이프라인 예시
"""
from typing import Dict, Any
import pandas as pd
from datetime import datetime, timedelta
class DataPipeline:
"""데이터 수집 및 준비 파이프라인"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.data_sources = config["data_sources"]
def extract(self) -> Dict[str, pd.DataFrame]:
"""다양한 소스에서 데이터 추출"""
data = {}
# 데이터베이스에서 추출
data["transactions"] = self.query_database(
query="SELECT * FROM transactions WHERE date > ?",
params=[self.config["start_date"]]
)
# S3에서 추출
data["user_events"] = self.read_from_s3(
bucket="data-lake",
prefix=f"events/{self.config['date_partition']}/"
)
# API에서 추출
data["external_features"] = self.fetch_from_api(
endpoint=self.config["external_api"]
)
return data
def transform(self, raw_data: Dict[str, pd.DataFrame]) -> pd.DataFrame:
"""데이터 변환 및 전처리"""
# 데이터 조인
df = raw_data["transactions"].merge(
raw_data["user_events"],
on="user_id",
how="left"
)
# 결측치 처리
df = self.handle_missing(df)
# 이상치 처리
df = self.handle_outliers(df)
# 데이터 타입 변환
df = self.convert_types(df)
return df
def validate(self, df: pd.DataFrame) -> bool:
"""데이터 품질 검증"""
validations = {
"row_count": len(df) > self.config["min_rows"],
"null_ratio": df.isnull().mean().max() < 0.1,
"schema_match": self.check_schema(df),
"value_ranges": self.check_value_ranges(df)
}
return all(validations.values())
def load(self, df: pd.DataFrame, destination: str):
"""처리된 데이터 저장"""
# 버전 정보 추가
df["_data_version"] = self.config["version"]
df["_processed_at"] = datetime.now()
# 저장
df.to_parquet(
f"{destination}/data_v{self.config['version']}.parquet",
index=False
)
3.2 데이터 버전 관리 (DVC)¶
# dvc.yaml - DVC 파이프라인 정의
stages:
prepare_data:
cmd: python src/data/prepare.py
deps:
- src/data/prepare.py
- data/raw/
outs:
- data/processed/train.parquet
- data/processed/test.parquet
train:
cmd: python src/train.py
deps:
- src/train.py
- data/processed/train.parquet
params:
- train.epochs
- train.learning_rate
outs:
- models/model.pkl
metrics:
- metrics/train_metrics.json:
cache: false
# DVC 기본 명령어
# 데이터 추적 시작
dvc add data/raw/dataset.csv
# 파이프라인 실행
dvc repro
# 버전 간 차이 확인
dvc diff
# 데이터 가져오기 (원격 저장소)
dvc pull
4. 피처 엔지니어링¶
4.1 피처 정의 및 계산¶
"""
피처 엔지니어링 파이프라인
"""
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, LabelEncoder
class FeatureEngineer:
"""피처 엔지니어링 클래스"""
def __init__(self, feature_config: dict):
self.config = feature_config
self.encoders = {}
self.scalers = {}
def create_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""피처 생성"""
features = pd.DataFrame()
# 시간 기반 피처
features["hour"] = df["timestamp"].dt.hour
features["day_of_week"] = df["timestamp"].dt.dayofweek
features["is_weekend"] = features["day_of_week"].isin([5, 6]).astype(int)
# 집계 피처
features["total_purchases_30d"] = self.rolling_aggregate(
df, "purchase_amount", window=30, agg="sum"
)
features["avg_session_duration_7d"] = self.rolling_aggregate(
df, "session_duration", window=7, agg="mean"
)
# 비율 피처
features["purchase_frequency"] = (
df["purchase_count"] / df["days_since_signup"]
).fillna(0)
# 상호작용 피처
features["value_per_session"] = (
df["total_purchase_value"] / df["session_count"]
).fillna(0)
return features
def encode_categoricals(self, df: pd.DataFrame) -> pd.DataFrame:
"""범주형 변수 인코딩"""
for col in self.config["categorical_features"]:
if col not in self.encoders:
self.encoders[col] = LabelEncoder()
df[col] = self.encoders[col].fit_transform(df[col])
else:
df[col] = self.encoders[col].transform(df[col])
return df
def scale_numericals(self, df: pd.DataFrame) -> pd.DataFrame:
"""수치형 변수 스케일링"""
numerical_cols = self.config["numerical_features"]
if "standard" not in self.scalers:
self.scalers["standard"] = StandardScaler()
df[numerical_cols] = self.scalers["standard"].fit_transform(
df[numerical_cols]
)
else:
df[numerical_cols] = self.scalers["standard"].transform(
df[numerical_cols]
)
return df
def save_transformers(self, path: str):
"""인코더/스케일러 저장"""
import joblib
joblib.dump({
"encoders": self.encoders,
"scalers": self.scalers
}, path)
4.2 피처 스토어 연동¶
"""
Feature Store 사용 예시 (Feast)
"""
from feast import FeatureStore
# Feature Store 초기화
fs = FeatureStore(repo_path="./feature_repo")
# 피처 가져오기 (학습용 - 오프라인)
training_df = fs.get_historical_features(
entity_df=entity_df, # entity_id, event_timestamp
features=[
"user_features:total_purchases",
"user_features:avg_session_duration",
"product_features:category",
"product_features:price_range"
]
).to_df()
# 피처 가져오기 (추론용 - 온라인)
feature_vector = fs.get_online_features(
features=[
"user_features:total_purchases",
"user_features:avg_session_duration"
],
entity_rows=[{"user_id": 12345}]
).to_dict()
5. 모델 학습¶
5.1 실험 관리¶
"""
실험 관리가 포함된 모델 학습
"""
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
import optuna
class ModelTrainer:
"""모델 학습 클래스"""
def __init__(self, experiment_name: str):
mlflow.set_experiment(experiment_name)
def train_with_tracking(
self,
X_train, y_train,
X_val, y_val,
params: dict
):
"""MLflow 추적이 포함된 학습"""
with mlflow.start_run():
# 파라미터 로깅
mlflow.log_params(params)
# 데이터 정보 로깅
mlflow.log_param("train_size", len(X_train))
mlflow.log_param("val_size", len(X_val))
# 모델 학습
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# 검증
val_predictions = model.predict(X_val)
val_proba = model.predict_proba(X_val)[:, 1]
# 메트릭 계산 및 로깅
metrics = self.calculate_metrics(y_val, val_predictions, val_proba)
mlflow.log_metrics(metrics)
# 모델 저장
mlflow.sklearn.log_model(
model, "model",
signature=mlflow.models.infer_signature(X_train, val_predictions)
)
# 피처 중요도 저장
self.log_feature_importance(model, X_train.columns)
return model, metrics
def hyperparameter_tuning(self, X, y, n_trials: int = 100):
"""Optuna를 이용한 하이퍼파라미터 튜닝"""
def objective(trial):
params = {
"n_estimators": trial.suggest_int("n_estimators", 50, 300),
"max_depth": trial.suggest_int("max_depth", 3, 15),
"min_samples_split": trial.suggest_int("min_samples_split", 2, 20),
"min_samples_leaf": trial.suggest_int("min_samples_leaf", 1, 10)
}
model = RandomForestClassifier(**params, random_state=42)
scores = cross_val_score(model, X, y, cv=5, scoring="roc_auc")
return scores.mean()
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=n_trials)
# 최적 파라미터 로깅
with mlflow.start_run(run_name="best_params"):
mlflow.log_params(study.best_params)
mlflow.log_metric("best_auc", study.best_value)
return study.best_params
5.2 학습 파이프라인¶
# training_pipeline.yaml
pipeline:
name: "churn-prediction-training"
schedule: "0 2 * * *" # 매일 오전 2시
stages:
- name: data_validation
script: src/validate_data.py
inputs:
- data/raw/
outputs:
- reports/data_validation.html
- name: feature_engineering
script: src/feature_engineering.py
inputs:
- data/raw/
outputs:
- data/features/
- name: train
script: src/train.py
inputs:
- data/features/
params:
- config/train_config.yaml
outputs:
- models/
- name: evaluate
script: src/evaluate.py
inputs:
- models/
- data/features/test.parquet
outputs:
- reports/evaluation.html
6. 모델 검증 및 테스트¶
6.1 모델 품질 게이트¶
"""
모델 품질 검증
"""
from typing import Dict, Any
import numpy as np
class ModelValidator:
"""모델 검증 클래스"""
def __init__(self, quality_gates: Dict[str, float]):
self.quality_gates = quality_gates
def validate(self, metrics: Dict[str, float]) -> Dict[str, Any]:
"""품질 게이트 검증"""
results = {
"passed": True,
"details": {}
}
for metric_name, threshold in self.quality_gates.items():
actual_value = metrics.get(metric_name, 0)
passed = actual_value >= threshold
results["details"][metric_name] = {
"threshold": threshold,
"actual": actual_value,
"passed": passed
}
if not passed:
results["passed"] = False
return results
def compare_with_baseline(
self,
new_metrics: Dict[str, float],
baseline_metrics: Dict[str, float],
min_improvement: float = 0.01
) -> Dict[str, Any]:
"""베이스라인 모델과 비교"""
results = {"improved": True, "details": {}}
for metric_name in new_metrics:
new_val = new_metrics[metric_name]
baseline_val = baseline_metrics.get(metric_name, 0)
improvement = (new_val - baseline_val) / baseline_val if baseline_val else 0
results["details"][metric_name] = {
"new": new_val,
"baseline": baseline_val,
"improvement": f"{improvement:.2%}"
}
# 성능 저하 체크
if new_val < baseline_val * (1 - min_improvement):
results["improved"] = False
return results
# 사용 예시
validator = ModelValidator({
"accuracy": 0.85,
"precision": 0.80,
"recall": 0.75,
"auc_roc": 0.85
})
validation_result = validator.validate(model_metrics)
if not validation_result["passed"]:
raise ValueError(f"Model failed quality gates: {validation_result}")
6.2 A/B 테스트 준비¶
"""
A/B 테스트 설정
"""
ab_test_config = {
"experiment_name": "churn_model_v2",
"variants": {
"control": {
"model_version": "v1.2.3",
"traffic_percentage": 50
},
"treatment": {
"model_version": "v2.0.0",
"traffic_percentage": 50
}
},
"metrics": {
"primary": "conversion_rate",
"secondary": ["latency_p99", "error_rate"]
},
"duration_days": 14,
"min_sample_size": 10000
}
7. 배포¶
7.1 배포 전략¶
"""
모델 배포 전략
"""
deployment_strategies = {
"blue_green": {
"description": "새 버전을 별도 환경에 배포 후 트래픽 전환",
"rollback": "즉시 가능 (이전 환경으로 트래픽 전환)",
"use_case": "다운타임 최소화 필요시"
},
"canary": {
"description": "일부 트래픽만 새 버전으로 점진적 전환",
"rollback": "트래픽 비율 조정으로 가능",
"use_case": "리스크 최소화, A/B 테스트"
},
"shadow": {
"description": "실제 트래픽 복제하여 새 모델 테스트 (결과 미반영)",
"rollback": "불필요 (프로덕션 영향 없음)",
"use_case": "새 모델 검증"
}
}
7.2 배포 코드¶
"""
모델 배포 자동화
"""
import mlflow
from mlflow.tracking import MlflowClient
class ModelDeployer:
"""모델 배포 클래스"""
def __init__(self, registry_uri: str):
self.client = MlflowClient(registry_uri)
def promote_to_production(
self,
model_name: str,
version: str,
archive_current: bool = True
):
"""모델을 프로덕션으로 승격"""
# 현재 프로덕션 모델 아카이브
if archive_current:
current_prod = self.get_production_model(model_name)
if current_prod:
self.client.transition_model_version_stage(
name=model_name,
version=current_prod.version,
stage="Archived"
)
# 새 버전을 프로덕션으로
self.client.transition_model_version_stage(
name=model_name,
version=version,
stage="Production"
)
print(f"Model {model_name} v{version} promoted to Production")
def rollback(self, model_name: str):
"""이전 버전으로 롤백"""
# 아카이브된 버전 중 가장 최근 버전 찾기
versions = self.client.search_model_versions(
f"name='{model_name}'"
)
archived = [v for v in versions if v.current_stage == "Archived"]
if not archived:
raise ValueError("No archived version available for rollback")
latest_archived = max(archived, key=lambda x: int(x.version))
# 현재 프로덕션 아카이브
current_prod = self.get_production_model(model_name)
if current_prod:
self.client.transition_model_version_stage(
name=model_name,
version=current_prod.version,
stage="Archived"
)
# 롤백 실행
self.client.transition_model_version_stage(
name=model_name,
version=latest_archived.version,
stage="Production"
)
print(f"Rolled back to v{latest_archived.version}")
8. 모니터링¶
8.1 모니터링 메트릭¶
"""
모델 모니터링 설정
"""
monitoring_config = {
# 모델 성능 메트릭
"model_metrics": {
"accuracy": {"threshold": 0.85, "alert_on": "below"},
"latency_p99": {"threshold": 100, "alert_on": "above", "unit": "ms"},
"error_rate": {"threshold": 0.01, "alert_on": "above"}
},
# 데이터 드리프트 메트릭
"drift_metrics": {
"psi": {"threshold": 0.1, "alert_on": "above"}, # Population Stability Index
"ks_statistic": {"threshold": 0.1, "alert_on": "above"}
},
# 시스템 메트릭
"system_metrics": {
"cpu_usage": {"threshold": 80, "alert_on": "above", "unit": "%"},
"memory_usage": {"threshold": 80, "alert_on": "above", "unit": "%"},
"gpu_utilization": {"threshold": 90, "alert_on": "above", "unit": "%"}
}
}
8.2 재학습 트리거¶
"""
자동 재학습 트리거
"""
class RetrainingTrigger:
"""재학습 트리거 클래스"""
def __init__(self, config: dict):
self.config = config
def check_triggers(self, metrics: dict) -> dict:
"""재학습 필요 여부 확인"""
triggers = {
"should_retrain": False,
"reasons": []
}
# 1. 성능 저하 체크
if metrics.get("accuracy", 1.0) < self.config["min_accuracy"]:
triggers["should_retrain"] = True
triggers["reasons"].append("accuracy_degradation")
# 2. 데이터 드리프트 체크
if metrics.get("psi", 0) > self.config["max_psi"]:
triggers["should_retrain"] = True
triggers["reasons"].append("data_drift")
# 3. 시간 기반 재학습
days_since_training = metrics.get("days_since_training", 0)
if days_since_training > self.config["max_days_without_training"]:
triggers["should_retrain"] = True
triggers["reasons"].append("scheduled_retrain")
# 4. 새 데이터 임계치
if metrics.get("new_data_count", 0) > self.config["new_data_threshold"]:
triggers["should_retrain"] = True
triggers["reasons"].append("new_data_available")
return triggers
# 설정 예시
retrain_config = {
"min_accuracy": 0.85,
"max_psi": 0.1,
"max_days_without_training": 30,
"new_data_threshold": 100000
}
trigger = RetrainingTrigger(retrain_config)
result = trigger.check_triggers(current_metrics)
if result["should_retrain"]:
print(f"Triggering retrain due to: {result['reasons']}")
# trigger_training_pipeline()
9. 버전 관리 전략¶
9.1 전체 버전 관리¶
# version_management.yaml
versioning:
# 데이터 버전
data:
strategy: "semantic" # v1.0.0
storage: "dvc"
format: "parquet"
# 코드 버전
code:
strategy: "git"
branching: "git-flow"
# 모델 버전
model:
strategy: "semantic"
registry: "mlflow"
stages: ["None", "Staging", "Production", "Archived"]
# 피처 버전
features:
strategy: "semantic"
store: "feast"
# 연결 관계 추적
lineage:
data_version -> code_version -> model_version
features_version -> model_version
9.2 시맨틱 버전 관리¶
"""
모델 시맨틱 버전 관리
"""
# 버전 형식: MAJOR.MINOR.PATCH
# MAJOR: 호환되지 않는 변경 (새 아키텍처, 피처 스키마 변경)
# MINOR: 기능 추가 (새 피처, 하이퍼파라미터 변경)
# PATCH: 버그 수정, 재학습
version_examples = {
"1.0.0": "초기 프로덕션 릴리스",
"1.0.1": "동일 데이터/피처로 재학습",
"1.1.0": "새 피처 추가",
"1.2.0": "하이퍼파라미터 최적화",
"2.0.0": "모델 아키텍처 변경 (RF -> XGBoost)"
}
연습 문제¶
문제 1: 파이프라인 설계¶
이커머스 추천 시스템의 ML 파이프라인을 설계하세요. 데이터 수집부터 모니터링까지 각 단계를 정의하세요.
문제 2: 재학습 정책¶
다음 상황에서 재학습 정책을 설계하세요: - 일일 신규 주문 10만 건 - 계절성이 강한 상품 판매 - 모델 추론 latency 50ms 이하 요구
요약¶
| 단계 | 주요 활동 | 핵심 산출물 |
|---|---|---|
| 문제 정의 | 비즈니스 목표, ML 문제 정의 | 프로젝트 문서 |
| 데이터 준비 | 수집, 검증, 버전 관리 | 검증된 데이터셋 |
| 피처 엔지니어링 | 피처 생성, 변환 | 피처 파이프라인 |
| 모델 학습 | 학습, 실험 관리 | 학습된 모델, 메트릭 |
| 검증 | 품질 게이트, A/B 테스트 | 검증 리포트 |
| 배포 | Blue/Green, Canary | 서빙 엔드포인트 |
| 모니터링 | 성능, 드리프트 감지 | 대시보드, 알림 |