10. Drift Detection & Monitoring

10. Drift Detection & Monitoring

1. Drift Concepts

Drift is the phenomenon where data or model performance changes over time.

1.1 Drift Types

┌─────────────────────────────────────────────────────────────────────┐
                        Drift Types                                   
├─────────────────────────────────────────────────────────────────────┤
                                                                     
   ┌─────────────────────────────────────────────────────────┐      
                     Data Drift                                    
                                                                   
      Input data distribution changes                              
      P(X)_train  P(X)_production                                
                                                                   
      Example: Customer age distribution changes                   
   └─────────────────────────────────────────────────────────┘      
                                                                     
   ┌─────────────────────────────────────────────────────────┐      
                     Concept Drift                                 
                                                                   
      Relationship between input and output changes                
      P(Y|X)_train  P(Y|X)_production                            
                                                                   
      Example: Economic changes alter churn patterns               
   └─────────────────────────────────────────────────────────┘      
                                                                     
   ┌─────────────────────────────────────────────────────────┐      
                     Label Drift                                   
                                                                   
      Target variable distribution changes                         
      P(Y)_train  P(Y)_production                                
                                                                   
      Example: Fraud rate increases                                
   └─────────────────────────────────────────────────────────┘      
                                                                     
└─────────────────────────────────────────────────────────────────────┘

1.2 Drift Causes

"""
Drift cause analysis
"""

drift_causes = {
    "data_collection_changes": {
        "examples": ["Sensor malfunction", "Data source change", "Logging bug"],
        "detection": "Schema validation, statistical monitoring"
    },
    "external_environment_changes": {
        "examples": ["Seasonality", "Competitor actions", "Economic conditions", "Regulatory changes"],
        "detection": "Domain knowledge, long-term trend analysis"
    },
    "user_behavior_changes": {
        "examples": ["New user influx", "Usage pattern changes", "Generational shifts"],
        "detection": "Cohort analysis, A/B testing"
    },
    "feature_engineering_errors": {
        "examples": ["Upstream data changes", "Preprocessing bugs"],
        "detection": "Feature store validation, unit tests"
    }
}

2. Drift Detection Techniques

2.1 Statistical Tests

"""
Drift detection statistical methods
"""

import numpy as np
from scipy import stats
from typing import Tuple

def kolmogorov_smirnov_test(
    reference: np.ndarray,
    current: np.ndarray,
    threshold: float = 0.05
) -> Tuple[float, bool]:
    """KS test - Test difference between two distributions"""
    statistic, p_value = stats.ks_2samp(reference, current)
    is_drift = p_value < threshold
    return statistic, is_drift

def population_stability_index(
    reference: np.ndarray,
    current: np.ndarray,
    n_bins: int = 10
) -> float:
    """PSI - Population Stability Index"""
    # Create histograms
    bins = np.histogram_bin_edges(reference, bins=n_bins)
    ref_hist, _ = np.histogram(reference, bins=bins, density=True)
    cur_hist, _ = np.histogram(current, bins=bins, density=True)

    # Prevent zeros
    ref_hist = np.where(ref_hist == 0, 0.0001, ref_hist)
    cur_hist = np.where(cur_hist == 0, 0.0001, cur_hist)

    # Calculate PSI
    psi = np.sum((cur_hist - ref_hist) * np.log(cur_hist / ref_hist))

    return psi

def wasserstein_distance(
    reference: np.ndarray,
    current: np.ndarray
) -> float:
    """Wasserstein distance (Earth Mover's Distance)"""
    return stats.wasserstein_distance(reference, current)

def jensen_shannon_divergence(
    reference: np.ndarray,
    current: np.ndarray,
    n_bins: int = 10
) -> float:
    """Jensen-Shannon Divergence"""
    from scipy.spatial.distance import jensenshannon

    bins = np.histogram_bin_edges(reference, bins=n_bins)
    ref_hist, _ = np.histogram(reference, bins=bins, density=True)
    cur_hist, _ = np.histogram(current, bins=bins, density=True)

    return jensenshannon(ref_hist, cur_hist)

# Interpretation guidelines
psi_thresholds = {
    "PSI < 0.1": "No change",
    "0.1 <= PSI < 0.2": "Slight change",
    "PSI >= 0.2": "Significant change - retraining required"
}

2.2 Multivariate Drift Detection

"""
Multivariate drift detection
"""

import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score

def domain_classifier_drift(
    reference: np.ndarray,
    current: np.ndarray,
    threshold: float = 0.55
) -> Tuple[float, bool]:
    """
    Domain classifier-based drift detection
    - Train classifier to distinguish reference from current data
    - AUC close to 0.5 indicates no drift
    """
    # Generate labels
    X = np.vstack([reference, current])
    y = np.hstack([
        np.zeros(len(reference)),
        np.ones(len(current))
    ])

    # Train and evaluate classifier
    clf = RandomForestClassifier(n_estimators=100, random_state=42)
    scores = cross_val_score(clf, X, y, cv=5, scoring="roc_auc")
    mean_auc = scores.mean()

    # Further from 0.5, higher drift probability
    drift_score = abs(mean_auc - 0.5) * 2
    is_drift = mean_auc > threshold

    return drift_score, is_drift

def multivariate_drift_pca(
    reference: np.ndarray,
    current: np.ndarray,
    n_components: int = 5
) -> float:
    """
    PCA-based multivariate drift detection
    """
    from sklearn.decomposition import PCA
    from scipy.spatial.distance import mahalanobis

    # PCA dimensionality reduction
    pca = PCA(n_components=n_components)
    ref_pca = pca.fit_transform(reference)
    cur_pca = pca.transform(current)

    # Mean and covariance for each dimension
    ref_mean = np.mean(ref_pca, axis=0)
    cur_mean = np.mean(cur_pca, axis=0)
    ref_cov = np.cov(ref_pca.T)

    # Mahalanobis distance
    try:
        distance = mahalanobis(ref_mean, cur_mean, np.linalg.inv(ref_cov))
    except np.linalg.LinAlgError:
        distance = np.linalg.norm(ref_mean - cur_mean)

    return distance

3. Evidently AI

3.1 Evidently Basics

"""
Evidently AI drift detection
"""

import pandas as pd
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from evidently.metrics import (
    DatasetDriftMetric,
    DataDriftTable,
    ColumnDriftMetric
)

# Prepare data
reference_data = pd.read_csv("reference_data.csv")
current_data = pd.read_csv("current_data.csv")

# Column mapping
column_mapping = ColumnMapping(
    target="target",
    prediction="prediction",
    numerical_features=["age", "tenure", "monthly_charges"],
    categorical_features=["gender", "contract_type"]
)

# Data drift report
drift_report = Report(metrics=[
    DatasetDriftMetric(),
    DataDriftTable(),
])

drift_report.run(
    reference_data=reference_data,
    current_data=current_data,
    column_mapping=column_mapping
)

# Save HTML report
drift_report.save_html("drift_report.html")

# JSON results (for programming use)
result = drift_report.as_dict()
print(f"Dataset drift detected: {result['metrics'][0]['result']['dataset_drift']}")

3.2 Detailed Drift Analysis

"""
Evidently detailed analysis
"""

from evidently.report import Report
from evidently.metrics import (
    ColumnDriftMetric,
    ColumnSummaryMetric,
    ColumnQuantileMetric,
    ColumnValueRangeMetric
)

# Detailed analysis for specific column
column_report = Report(metrics=[
    ColumnDriftMetric(column_name="monthly_charges"),
    ColumnSummaryMetric(column_name="monthly_charges"),
    ColumnQuantileMetric(column_name="monthly_charges", quantile=0.95),
    ColumnValueRangeMetric(column_name="monthly_charges"),
])

column_report.run(
    reference_data=reference_data,
    current_data=current_data
)

# Extract results
result = column_report.as_dict()

for metric in result["metrics"]:
    metric_name = metric["metric"]
    metric_result = metric["result"]
    print(f"{metric_name}: {metric_result}")

3.3 Model Performance Monitoring

"""
Evidently model performance monitoring
"""

from evidently.report import Report
from evidently.metric_preset import ClassificationPreset, RegressionPreset
from evidently.metrics import (
    ClassificationQualityMetric,
    ClassificationClassBalance,
    ClassificationConfusionMatrix
)

# Classification model performance report
classification_report = Report(metrics=[
    ClassificationPreset(),
])

classification_report.run(
    reference_data=reference_data,  # Training data + predictions
    current_data=current_data,       # Current data + predictions
    column_mapping=column_mapping
)

classification_report.save_html("model_performance_report.html")

# Extract results
result = classification_report.as_dict()
performance = result["metrics"][0]["result"]["current"]

print(f"Accuracy: {performance['accuracy']}")
print(f"Precision: {performance['precision']}")
print(f"Recall: {performance['recall']}")

3.4 Real-time Monitoring

"""
Evidently Test Suite (automated testing)
"""

from evidently.test_suite import TestSuite
from evidently.test_preset import DataDriftTestPreset, DataQualityTestPreset
from evidently.tests import (
    TestColumnDrift,
    TestShareOfMissingValues,
    TestColumnValueRange,
    TestNumberOfRows
)

# Define test suite
test_suite = TestSuite(tests=[
    # Presets
    DataDriftTestPreset(),

    # Individual tests
    TestColumnDrift(column_name="monthly_charges", stattest_threshold=0.1),
    TestShareOfMissingValues(column_name="age", lt=0.05),
    TestColumnValueRange(column_name="age", left=18, right=100),
    TestNumberOfRows(gte=1000),
])

test_suite.run(
    reference_data=reference_data,
    current_data=current_data,
    column_mapping=column_mapping
)

# Check results
result = test_suite.as_dict()

print(f"Tests passed: {result['summary']['success_tests']}")
print(f"Tests failed: {result['summary']['failed_tests']}")

# Failed test details
for test in result["tests"]:
    if test["status"] == "FAIL":
        print(f"FAILED: {test['name']} - {test['description']}")

4. Monitoring Pipeline

4.1 Complete Monitoring System

"""
Production monitoring pipeline
"""

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, Any, List
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric

class ModelMonitor:
    """ML model monitoring system"""

    def __init__(
        self,
        reference_data: pd.DataFrame,
        column_mapping,
        alert_thresholds: Dict[str, float]
    ):
        self.reference_data = reference_data
        self.column_mapping = column_mapping
        self.thresholds = alert_thresholds
        self.monitoring_history = []

    def check_data_drift(self, current_data: pd.DataFrame) -> Dict[str, Any]:
        """Check data drift"""
        report = Report(metrics=[DatasetDriftMetric()])
        report.run(
            reference_data=self.reference_data,
            current_data=current_data,
            column_mapping=self.column_mapping
        )

        result = report.as_dict()
        drift_result = result["metrics"][0]["result"]

        return {
            "timestamp": datetime.now().isoformat(),
            "dataset_drift": drift_result["dataset_drift"],
            "drift_share": drift_result["drift_share"],
            "number_of_drifted_columns": drift_result["number_of_drifted_columns"],
            "columns": drift_result.get("drift_by_columns", {})
        }

    def check_model_performance(
        self,
        predictions: pd.Series,
        actuals: pd.Series
    ) -> Dict[str, float]:
        """Check model performance"""
        from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

        metrics = {
            "accuracy": accuracy_score(actuals, predictions),
            "precision": precision_score(actuals, predictions, average="macro"),
            "recall": recall_score(actuals, predictions, average="macro"),
            "f1_score": f1_score(actuals, predictions, average="macro")
        }

        return metrics

    def generate_alerts(self, drift_result: Dict, performance: Dict) -> List[str]:
        """Generate alerts"""
        alerts = []

        # Data drift alerts
        if drift_result["dataset_drift"]:
            alerts.append(f"DATA_DRIFT: {drift_result['drift_share']:.1%} of features drifted")

        # Performance degradation alerts
        for metric, value in performance.items():
            if metric in self.thresholds and value < self.thresholds[metric]:
                alerts.append(
                    f"PERFORMANCE_DEGRADATION: {metric} = {value:.4f} "
                    f"(threshold: {self.thresholds[metric]})"
                )

        return alerts

    def run_monitoring(
        self,
        current_data: pd.DataFrame,
        predictions: pd.Series = None,
        actuals: pd.Series = None
    ) -> Dict[str, Any]:
        """Run full monitoring"""
        result = {
            "timestamp": datetime.now().isoformat(),
            "data_drift": self.check_data_drift(current_data),
            "alerts": []
        }

        if predictions is not None and actuals is not None:
            result["performance"] = self.check_model_performance(predictions, actuals)
            result["alerts"] = self.generate_alerts(
                result["data_drift"],
                result["performance"]
            )
        else:
            result["alerts"] = self.generate_alerts(result["data_drift"], {})

        self.monitoring_history.append(result)

        return result

# Example usage
monitor = ModelMonitor(
    reference_data=reference_data,
    column_mapping=column_mapping,
    alert_thresholds={
        "accuracy": 0.85,
        "f1_score": 0.80
    }
)

# Hourly monitoring
result = monitor.run_monitoring(
    current_data=hourly_data,
    predictions=predictions,
    actuals=actuals
)

if result["alerts"]:
    for alert in result["alerts"]:
        print(f"ALERT: {alert}")
        # send_slack_notification(alert)
        # send_email_alert(alert)

4.2 Prometheus + Grafana Integration

"""
Expose Prometheus metrics
"""

from prometheus_client import Gauge, Counter, Histogram, start_http_server
import time

# Define metrics
DRIFT_SCORE = Gauge(
    "model_drift_score",
    "Current drift score",
    ["feature"]
)

PREDICTION_ACCURACY = Gauge(
    "model_accuracy",
    "Current model accuracy"
)

PREDICTIONS_TOTAL = Counter(
    "predictions_total",
    "Total predictions made",
    ["model_version"]
)

PREDICTION_LATENCY = Histogram(
    "prediction_latency_seconds",
    "Prediction latency in seconds",
    buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0]
)

class PrometheusMonitor:
    """Update Prometheus metrics"""

    def __init__(self, port: int = 9090):
        start_http_server(port)

    def update_drift_metrics(self, drift_result: dict):
        """Update drift metrics"""
        for feature, is_drifted in drift_result.get("columns", {}).items():
            DRIFT_SCORE.labels(feature=feature).set(
                1 if is_drifted else 0
            )

    def update_performance_metrics(self, metrics: dict):
        """Update performance metrics"""
        PREDICTION_ACCURACY.set(metrics.get("accuracy", 0))

    def record_prediction(self, model_version: str, latency: float):
        """Record prediction"""
        PREDICTIONS_TOTAL.labels(model_version=model_version).inc()
        PREDICTION_LATENCY.observe(latency)

# Usage
prom_monitor = PrometheusMonitor(port=9090)
prom_monitor.update_drift_metrics(drift_result)
prom_monitor.update_performance_metrics(performance_metrics)

5. Alert Configuration

5.1 Slack Alerts

"""
Slack integration
"""

import requests
import json

class SlackAlerter:
    """Send Slack alerts"""

    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url

    def send_alert(
        self,
        title: str,
        message: str,
        severity: str = "warning"
    ):
        """Send alert"""
        color = {
            "info": "#36a64f",
            "warning": "#ff9800",
            "critical": "#ff0000"
        }.get(severity, "#808080")

        payload = {
            "attachments": [
                {
                    "color": color,
                    "title": title,
                    "text": message,
                    "fields": [
                        {
                            "title": "Severity",
                            "value": severity.upper(),
                            "short": True
                        },
                        {
                            "title": "Timestamp",
                            "value": datetime.now().isoformat(),
                            "short": True
                        }
                    ]
                }
            ]
        }

        response = requests.post(
            self.webhook_url,
            json=payload
        )

        return response.status_code == 200

    def send_drift_alert(self, drift_result: dict):
        """Send drift alert"""
        if drift_result["dataset_drift"]:
            drifted_cols = drift_result.get("number_of_drifted_columns", 0)
            drift_share = drift_result.get("drift_share", 0)

            self.send_alert(
                title="Data Drift Detected",
                message=f"{drifted_cols} features drifted ({drift_share:.1%})",
                severity="critical" if drift_share > 0.5 else "warning"
            )

# Usage
alerter = SlackAlerter(webhook_url="https://hooks.slack.com/services/...")
alerter.send_drift_alert(drift_result)

5.2 Automated Retraining Trigger

"""
Automated retraining trigger
"""

class RetrainingTrigger:
    """Automated retraining trigger"""

    def __init__(
        self,
        drift_threshold: float = 0.3,
        performance_threshold: float = 0.85,
        cooldown_hours: int = 24
    ):
        self.drift_threshold = drift_threshold
        self.performance_threshold = performance_threshold
        self.cooldown_hours = cooldown_hours
        self.last_retrain = None

    def should_retrain(
        self,
        drift_score: float,
        performance: float
    ) -> tuple[bool, str]:
        """Determine if retraining needed"""
        # Cooldown check
        if self.last_retrain:
            hours_since = (datetime.now() - self.last_retrain).total_seconds() / 3600
            if hours_since < self.cooldown_hours:
                return False, f"In cooldown period ({hours_since:.1f}h)"

        # Drift-based
        if drift_score > self.drift_threshold:
            return True, f"High drift score: {drift_score:.2f}"

        # Performance-based
        if performance < self.performance_threshold:
            return True, f"Low performance: {performance:.4f}"

        return False, "No retraining needed"

    def trigger_retraining(self, reason: str):
        """Trigger retraining"""
        self.last_retrain = datetime.now()

        # Trigger pipeline (e.g., Airflow, Kubeflow)
        # trigger_training_pipeline()

        print(f"Retraining triggered: {reason}")
        return True

# Usage
trigger = RetrainingTrigger(
    drift_threshold=0.3,
    performance_threshold=0.85
)

should_retrain, reason = trigger.should_retrain(
    drift_score=0.35,
    performance=0.82
)

if should_retrain:
    trigger.trigger_retraining(reason)

Practice Exercises

Exercise 1: Drift Detection

Simulate and detect data drift using synthetic data.

Exercise 2: Evidently Reports

Generate complete Evidently reports for a real dataset.

Exercise 3: Alert System

Build a system that automatically sends alerts when drift is detected.


Summary

Drift Type Description Detection Method
Data Drift Input distribution change PSI, KS Test
Concept Drift Input-output relationship change Performance monitoring
Label Drift Target distribution change Class distribution comparison

References

to navigate between lessons