드리프트 감지 & 모니터링
드리프트 감지 & 모니터링¶
1. 드리프트 개념¶
드리프트(Drift)는 시간이 지남에 따라 데이터나 모델 성능이 변화하는 현상입니다.
1.1 드리프트 유형¶
┌─────────────────────────────────────────────────────────────────────┐
│ 드리프트 유형 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Data Drift │ │
│ │ │ │
│ │ 입력 데이터의 분포가 변화 │ │
│ │ P(X)_train ≠ P(X)_production │ │
│ │ │ │
│ │ 예: 고객 연령대 분포 변화 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Concept Drift │ │
│ │ │ │
│ │ 입력과 출력 간의 관계가 변화 │ │
│ │ P(Y|X)_train ≠ P(Y|X)_production │ │
│ │ │ │
│ │ 예: 경제 상황 변화로 이탈 패턴 변화 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Label Drift │ │
│ │ │ │
│ │ 타겟 변수의 분포가 변화 │ │
│ │ P(Y)_train ≠ P(Y)_production │ │
│ │ │ │
│ │ 예: 사기 비율 증가 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
1.2 드리프트의 원인¶
"""
드리프트 원인 분석
"""
drift_causes = {
"데이터 수집 변화": {
"examples": ["센서 오작동", "데이터 소스 변경", "로깅 버그"],
"detection": "스키마 검증, 통계 모니터링"
},
"외부 환경 변화": {
"examples": ["계절성", "경쟁사 행동", "경제 상황", "규제 변화"],
"detection": "도메인 지식, 장기 트렌드 분석"
},
"사용자 행동 변화": {
"examples": ["신규 사용자 유입", "사용 패턴 변화", "세대 교체"],
"detection": "코호트 분석, A/B 테스트"
},
"피처 엔지니어링 오류": {
"examples": ["업스트림 데이터 변경", "전처리 버그"],
"detection": "피처 스토어 검증, 단위 테스트"
}
}
2. 드리프트 감지 기법¶
2.1 통계적 검정¶
"""
드리프트 감지 통계 기법
"""
import numpy as np
from scipy import stats
from typing import Tuple
def kolmogorov_smirnov_test(
reference: np.ndarray,
current: np.ndarray,
threshold: float = 0.05
) -> Tuple[float, bool]:
"""KS 검정 - 두 분포의 차이 검정"""
statistic, p_value = stats.ks_2samp(reference, current)
is_drift = p_value < threshold
return statistic, is_drift
def population_stability_index(
reference: np.ndarray,
current: np.ndarray,
n_bins: int = 10
) -> float:
"""PSI - 분포 안정성 지수"""
# 히스토그램 생성
bins = np.histogram_bin_edges(reference, bins=n_bins)
ref_hist, _ = np.histogram(reference, bins=bins, density=True)
cur_hist, _ = np.histogram(current, bins=bins, density=True)
# 0 방지
ref_hist = np.where(ref_hist == 0, 0.0001, ref_hist)
cur_hist = np.where(cur_hist == 0, 0.0001, cur_hist)
# PSI 계산
psi = np.sum((cur_hist - ref_hist) * np.log(cur_hist / ref_hist))
return psi
def wasserstein_distance(
reference: np.ndarray,
current: np.ndarray
) -> float:
"""Wasserstein 거리 (Earth Mover's Distance)"""
return stats.wasserstein_distance(reference, current)
def jensen_shannon_divergence(
reference: np.ndarray,
current: np.ndarray,
n_bins: int = 10
) -> float:
"""Jensen-Shannon Divergence"""
from scipy.spatial.distance import jensenshannon
bins = np.histogram_bin_edges(reference, bins=n_bins)
ref_hist, _ = np.histogram(reference, bins=bins, density=True)
cur_hist, _ = np.histogram(current, bins=bins, density=True)
return jensenshannon(ref_hist, cur_hist)
# 해석 기준
psi_thresholds = {
"PSI < 0.1": "변화 없음",
"0.1 <= PSI < 0.2": "약간의 변화",
"PSI >= 0.2": "심각한 변화 - 재학습 필요"
}
2.2 다변량 드리프트 감지¶
"""
다변량 드리프트 감지
"""
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
def domain_classifier_drift(
reference: np.ndarray,
current: np.ndarray,
threshold: float = 0.55
) -> Tuple[float, bool]:
"""
도메인 분류기 기반 드리프트 감지
- 참조 데이터와 현재 데이터를 구분하는 분류기 학습
- AUC가 0.5에 가까우면 드리프트 없음
"""
# 레이블 생성
X = np.vstack([reference, current])
y = np.hstack([
np.zeros(len(reference)),
np.ones(len(current))
])
# 분류기 학습 및 평가
clf = RandomForestClassifier(n_estimators=100, random_state=42)
scores = cross_val_score(clf, X, y, cv=5, scoring="roc_auc")
mean_auc = scores.mean()
# 0.5에서 멀수록 드리프트 가능성 높음
drift_score = abs(mean_auc - 0.5) * 2
is_drift = mean_auc > threshold
return drift_score, is_drift
def multivariate_drift_pca(
reference: np.ndarray,
current: np.ndarray,
n_components: int = 5
) -> float:
"""
PCA 기반 다변량 드리프트 감지
"""
from sklearn.decomposition import PCA
from scipy.spatial.distance import mahalanobis
# PCA로 차원 축소
pca = PCA(n_components=n_components)
ref_pca = pca.fit_transform(reference)
cur_pca = pca.transform(current)
# 각 차원의 평균과 공분산
ref_mean = np.mean(ref_pca, axis=0)
cur_mean = np.mean(cur_pca, axis=0)
ref_cov = np.cov(ref_pca.T)
# Mahalanobis 거리
try:
distance = mahalanobis(ref_mean, cur_mean, np.linalg.inv(ref_cov))
except np.linalg.LinAlgError:
distance = np.linalg.norm(ref_mean - cur_mean)
return distance
3. Evidently AI¶
3.1 Evidently 기본 사용¶
"""
Evidently AI 드리프트 감지
"""
import pandas as pd
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from evidently.metrics import (
DatasetDriftMetric,
DataDriftTable,
ColumnDriftMetric
)
# 데이터 준비
reference_data = pd.read_csv("reference_data.csv")
current_data = pd.read_csv("current_data.csv")
# 컬럼 매핑
column_mapping = ColumnMapping(
target="target",
prediction="prediction",
numerical_features=["age", "tenure", "monthly_charges"],
categorical_features=["gender", "contract_type"]
)
# 데이터 드리프트 리포트
drift_report = Report(metrics=[
DatasetDriftMetric(),
DataDriftTable(),
])
drift_report.run(
reference_data=reference_data,
current_data=current_data,
column_mapping=column_mapping
)
# HTML 리포트 저장
drift_report.save_html("drift_report.html")
# JSON 결과 (프로그래밍 사용)
result = drift_report.as_dict()
print(f"Dataset drift detected: {result['metrics'][0]['result']['dataset_drift']}")
3.2 상세 드리프트 분석¶
"""
Evidently 상세 분석
"""
from evidently.report import Report
from evidently.metrics import (
ColumnDriftMetric,
ColumnSummaryMetric,
ColumnQuantileMetric,
ColumnValueRangeMetric
)
# 특정 컬럼 상세 분석
column_report = Report(metrics=[
ColumnDriftMetric(column_name="monthly_charges"),
ColumnSummaryMetric(column_name="monthly_charges"),
ColumnQuantileMetric(column_name="monthly_charges", quantile=0.95),
ColumnValueRangeMetric(column_name="monthly_charges"),
])
column_report.run(
reference_data=reference_data,
current_data=current_data
)
# 결과 추출
result = column_report.as_dict()
for metric in result["metrics"]:
metric_name = metric["metric"]
metric_result = metric["result"]
print(f"{metric_name}: {metric_result}")
3.3 모델 성능 모니터링¶
"""
Evidently 모델 성능 모니터링
"""
from evidently.report import Report
from evidently.metric_preset import ClassificationPreset, RegressionPreset
from evidently.metrics import (
ClassificationQualityMetric,
ClassificationClassBalance,
ClassificationConfusionMatrix
)
# 분류 모델 성능 리포트
classification_report = Report(metrics=[
ClassificationPreset(),
])
classification_report.run(
reference_data=reference_data, # 학습 데이터 + 예측
current_data=current_data, # 현재 데이터 + 예측
column_mapping=column_mapping
)
classification_report.save_html("model_performance_report.html")
# 결과 추출
result = classification_report.as_dict()
performance = result["metrics"][0]["result"]["current"]
print(f"Accuracy: {performance['accuracy']}")
print(f"Precision: {performance['precision']}")
print(f"Recall: {performance['recall']}")
3.4 실시간 모니터링¶
"""
Evidently Test Suite (자동화된 검사)
"""
from evidently.test_suite import TestSuite
from evidently.test_preset import DataDriftTestPreset, DataQualityTestPreset
from evidently.tests import (
TestColumnDrift,
TestShareOfMissingValues,
TestColumnValueRange,
TestNumberOfRows
)
# 테스트 스위트 정의
test_suite = TestSuite(tests=[
# 프리셋
DataDriftTestPreset(),
# 개별 테스트
TestColumnDrift(column_name="monthly_charges", stattest_threshold=0.1),
TestShareOfMissingValues(column_name="age", lt=0.05),
TestColumnValueRange(column_name="age", left=18, right=100),
TestNumberOfRows(gte=1000),
])
test_suite.run(
reference_data=reference_data,
current_data=current_data,
column_mapping=column_mapping
)
# 결과 확인
result = test_suite.as_dict()
print(f"Tests passed: {result['summary']['success_tests']}")
print(f"Tests failed: {result['summary']['failed_tests']}")
# 실패한 테스트 상세
for test in result["tests"]:
if test["status"] == "FAIL":
print(f"FAILED: {test['name']} - {test['description']}")
4. 모니터링 파이프라인¶
4.1 완전한 모니터링 시스템¶
"""
프로덕션 모니터링 파이프라인
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, Any, List
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric
class ModelMonitor:
"""ML 모델 모니터링 시스템"""
def __init__(
self,
reference_data: pd.DataFrame,
column_mapping,
alert_thresholds: Dict[str, float]
):
self.reference_data = reference_data
self.column_mapping = column_mapping
self.thresholds = alert_thresholds
self.monitoring_history = []
def check_data_drift(self, current_data: pd.DataFrame) -> Dict[str, Any]:
"""데이터 드리프트 검사"""
report = Report(metrics=[DatasetDriftMetric()])
report.run(
reference_data=self.reference_data,
current_data=current_data,
column_mapping=self.column_mapping
)
result = report.as_dict()
drift_result = result["metrics"][0]["result"]
return {
"timestamp": datetime.now().isoformat(),
"dataset_drift": drift_result["dataset_drift"],
"drift_share": drift_result["drift_share"],
"number_of_drifted_columns": drift_result["number_of_drifted_columns"],
"columns": drift_result.get("drift_by_columns", {})
}
def check_model_performance(
self,
predictions: pd.Series,
actuals: pd.Series
) -> Dict[str, float]:
"""모델 성능 검사"""
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
metrics = {
"accuracy": accuracy_score(actuals, predictions),
"precision": precision_score(actuals, predictions, average="macro"),
"recall": recall_score(actuals, predictions, average="macro"),
"f1_score": f1_score(actuals, predictions, average="macro")
}
return metrics
def generate_alerts(self, drift_result: Dict, performance: Dict) -> List[str]:
"""알림 생성"""
alerts = []
# 데이터 드리프트 알림
if drift_result["dataset_drift"]:
alerts.append(f"DATA_DRIFT: {drift_result['drift_share']:.1%} of features drifted")
# 성능 저하 알림
for metric, value in performance.items():
if metric in self.thresholds and value < self.thresholds[metric]:
alerts.append(
f"PERFORMANCE_DEGRADATION: {metric} = {value:.4f} "
f"(threshold: {self.thresholds[metric]})"
)
return alerts
def run_monitoring(
self,
current_data: pd.DataFrame,
predictions: pd.Series = None,
actuals: pd.Series = None
) -> Dict[str, Any]:
"""전체 모니터링 실행"""
result = {
"timestamp": datetime.now().isoformat(),
"data_drift": self.check_data_drift(current_data),
"alerts": []
}
if predictions is not None and actuals is not None:
result["performance"] = self.check_model_performance(predictions, actuals)
result["alerts"] = self.generate_alerts(
result["data_drift"],
result["performance"]
)
else:
result["alerts"] = self.generate_alerts(result["data_drift"], {})
self.monitoring_history.append(result)
return result
# 사용 예시
monitor = ModelMonitor(
reference_data=reference_data,
column_mapping=column_mapping,
alert_thresholds={
"accuracy": 0.85,
"f1_score": 0.80
}
)
# 매시간 모니터링
result = monitor.run_monitoring(
current_data=hourly_data,
predictions=predictions,
actuals=actuals
)
if result["alerts"]:
for alert in result["alerts"]:
print(f"ALERT: {alert}")
# send_slack_notification(alert)
# send_email_alert(alert)
4.2 Prometheus + Grafana 통합¶
"""
Prometheus 메트릭 노출
"""
from prometheus_client import Gauge, Counter, Histogram, start_http_server
import time
# 메트릭 정의
DRIFT_SCORE = Gauge(
"model_drift_score",
"Current drift score",
["feature"]
)
PREDICTION_ACCURACY = Gauge(
"model_accuracy",
"Current model accuracy"
)
PREDICTIONS_TOTAL = Counter(
"predictions_total",
"Total predictions made",
["model_version"]
)
PREDICTION_LATENCY = Histogram(
"prediction_latency_seconds",
"Prediction latency in seconds",
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0]
)
class PrometheusMonitor:
"""Prometheus 메트릭 업데이트"""
def __init__(self, port: int = 9090):
start_http_server(port)
def update_drift_metrics(self, drift_result: dict):
"""드리프트 메트릭 업데이트"""
for feature, is_drifted in drift_result.get("columns", {}).items():
DRIFT_SCORE.labels(feature=feature).set(
1 if is_drifted else 0
)
def update_performance_metrics(self, metrics: dict):
"""성능 메트릭 업데이트"""
PREDICTION_ACCURACY.set(metrics.get("accuracy", 0))
def record_prediction(self, model_version: str, latency: float):
"""예측 기록"""
PREDICTIONS_TOTAL.labels(model_version=model_version).inc()
PREDICTION_LATENCY.observe(latency)
# 사용
prom_monitor = PrometheusMonitor(port=9090)
prom_monitor.update_drift_metrics(drift_result)
prom_monitor.update_performance_metrics(performance_metrics)
5. 알림 설정¶
5.1 Slack 알림¶
"""
Slack 알림 연동
"""
import requests
import json
class SlackAlerter:
"""Slack 알림 발송"""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send_alert(
self,
title: str,
message: str,
severity: str = "warning"
):
"""알림 발송"""
color = {
"info": "#36a64f",
"warning": "#ff9800",
"critical": "#ff0000"
}.get(severity, "#808080")
payload = {
"attachments": [
{
"color": color,
"title": title,
"text": message,
"fields": [
{
"title": "Severity",
"value": severity.upper(),
"short": True
},
{
"title": "Timestamp",
"value": datetime.now().isoformat(),
"short": True
}
]
}
]
}
response = requests.post(
self.webhook_url,
json=payload
)
return response.status_code == 200
def send_drift_alert(self, drift_result: dict):
"""드리프트 알림"""
if drift_result["dataset_drift"]:
drifted_cols = drift_result.get("number_of_drifted_columns", 0)
drift_share = drift_result.get("drift_share", 0)
self.send_alert(
title="Data Drift Detected",
message=f"{drifted_cols} features drifted ({drift_share:.1%})",
severity="critical" if drift_share > 0.5 else "warning"
)
# 사용
alerter = SlackAlerter(webhook_url="https://hooks.slack.com/services/...")
alerter.send_drift_alert(drift_result)
5.2 자동 재학습 트리거¶
"""
자동 재학습 트리거
"""
class RetrainingTrigger:
"""자동 재학습 트리거"""
def __init__(
self,
drift_threshold: float = 0.3,
performance_threshold: float = 0.85,
cooldown_hours: int = 24
):
self.drift_threshold = drift_threshold
self.performance_threshold = performance_threshold
self.cooldown_hours = cooldown_hours
self.last_retrain = None
def should_retrain(
self,
drift_score: float,
performance: float
) -> tuple[bool, str]:
"""재학습 필요 여부 판단"""
# 쿨다운 체크
if self.last_retrain:
hours_since = (datetime.now() - self.last_retrain).total_seconds() / 3600
if hours_since < self.cooldown_hours:
return False, f"In cooldown period ({hours_since:.1f}h)"
# 드리프트 기반
if drift_score > self.drift_threshold:
return True, f"High drift score: {drift_score:.2f}"
# 성능 기반
if performance < self.performance_threshold:
return True, f"Low performance: {performance:.4f}"
return False, "No retraining needed"
def trigger_retraining(self, reason: str):
"""재학습 트리거"""
self.last_retrain = datetime.now()
# 파이프라인 트리거 (예: Airflow, Kubeflow)
# trigger_training_pipeline()
print(f"Retraining triggered: {reason}")
return True
# 사용
trigger = RetrainingTrigger(
drift_threshold=0.3,
performance_threshold=0.85
)
should_retrain, reason = trigger.should_retrain(
drift_score=0.35,
performance=0.82
)
if should_retrain:
trigger.trigger_retraining(reason)
연습 문제¶
문제 1: 드리프트 감지¶
합성 데이터로 데이터 드리프트를 시뮬레이션하고 감지하세요.
문제 2: Evidently 리포트¶
실제 데이터셋에 대해 완전한 Evidently 리포트를 생성하세요.
문제 3: 알림 시스템¶
드리프트 감지 시 자동으로 알림을 보내는 시스템을 구축하세요.
요약¶
| 드리프트 유형 | 설명 | 감지 방법 |
|---|---|---|
| Data Drift | 입력 분포 변화 | PSI, KS Test |
| Concept Drift | 입출력 관계 변화 | 성능 모니터링 |
| Label Drift | 타겟 분포 변화 | 클래스 분포 비교 |