모델 서빙 기초

모델 서빙 기초

1. 모델 서빙 개념

모델 서빙은 학습된 ML 모델을 프로덕션 환경에서 예측 서비스로 제공하는 것입니다.

1.1 서빙 아키텍처

┌─────────────────────────────────────────────────────────────────────┐
│                      모델 서빙 아키텍처                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   ┌─────────┐                                                       │
│   │ Client  │                                                       │
│   │(App/Web)│                                                       │
│   └────┬────┘                                                       │
│        │                                                            │
│        ▼                                                            │
│   ┌─────────────┐     ┌─────────────┐     ┌─────────────┐          │
│   │    Load     │────▶│    API      │────▶│   Model     │          │
│   │  Balancer   │     │  Gateway    │     │   Server    │          │
│   └─────────────┘     └─────────────┘     └──────┬──────┘          │
│                                                   │                 │
│                             ┌─────────────────────┼────────┐       │
│                             │                     │        │       │
│                             ▼                     ▼        ▼       │
│                       ┌──────────┐          ┌──────────┐   ...     │
│                       │ Model A  │          │ Model B  │           │
│                       │ (v1.2.0) │          │ (v2.0.0) │           │
│                       └──────────┘          └──────────┘           │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 서빙 방식 비교

"""
모델 서빙 방식
"""

serving_methods = {
    "batch_inference": {
        "description": "대량 데이터를 일괄 처리",
        "latency": "높음 (분~시간)",
        "use_cases": ["추천 시스템 사전 계산", "리포트 생성", "데이터 파이프라인"],
        "pros": ["높은 처리량", "비용 효율적"],
        "cons": ["실시간 불가", "데이터 지연"]
    },
    "online_inference": {
        "description": "실시간 요청-응답",
        "latency": "낮음 (ms)",
        "use_cases": ["사기 탐지", "검색 랭킹", "챗봇"],
        "pros": ["실시간 응답", "최신 데이터"],
        "cons": ["인프라 비용", "복잡한 운영"]
    },
    "streaming_inference": {
        "description": "연속 데이터 스트림 처리",
        "latency": "중간 (초)",
        "use_cases": ["IoT 이상 탐지", "실시간 분석"],
        "pros": ["연속 처리", "이벤트 기반"],
        "cons": ["복잡한 아키텍처"]
    }
}

2. REST API 배포

2.1 Flask로 모델 서빙

"""
Flask 기반 모델 서빙
"""

from flask import Flask, request, jsonify
import joblib
import pandas as pd
import numpy as np
from typing import Dict, Any

app = Flask(__name__)

# 모델 로드 (애플리케이션 시작 시)
model = joblib.load("model.pkl")
scaler = joblib.load("scaler.pkl")

@app.route("/health", methods=["GET"])
def health():
    """헬스 체크 엔드포인트"""
    return jsonify({"status": "healthy"})

@app.route("/predict", methods=["POST"])
def predict():
    """예측 엔드포인트"""
    try:
        # 입력 데이터 파싱
        data = request.get_json()
        features = data.get("features")

        if features is None:
            return jsonify({"error": "Missing 'features' in request"}), 400

        # DataFrame 변환
        df = pd.DataFrame([features])

        # 전처리
        df_scaled = scaler.transform(df)

        # 예측
        prediction = model.predict(df_scaled)[0]
        probability = model.predict_proba(df_scaled)[0].tolist()

        return jsonify({
            "prediction": int(prediction),
            "probability": probability,
            "model_version": "1.0.0"
        })

    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route("/batch_predict", methods=["POST"])
def batch_predict():
    """배치 예측 엔드포인트"""
    try:
        data = request.get_json()
        instances = data.get("instances", [])

        df = pd.DataFrame(instances)
        df_scaled = scaler.transform(df)

        predictions = model.predict(df_scaled).tolist()
        probabilities = model.predict_proba(df_scaled).tolist()

        return jsonify({
            "predictions": predictions,
            "probabilities": probabilities
        })

    except Exception as e:
        return jsonify({"error": str(e)}), 500

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

2.2 FastAPI로 모델 서빙

"""
FastAPI 기반 모델 서빙 (권장)
"""

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import List, Optional
import joblib
import numpy as np
import pandas as pd
import uvicorn

app = FastAPI(
    title="ML Model API",
    description="Machine Learning Model Serving API",
    version="1.0.0"
)

# Pydantic 모델 정의
class PredictionInput(BaseModel):
    """예측 입력 스키마"""
    age: float = Field(..., description="Customer age")
    tenure: int = Field(..., description="Months as customer")
    monthly_charges: float = Field(..., description="Monthly charges")
    total_charges: float = Field(..., description="Total charges")

    class Config:
        json_schema_extra = {
            "example": {
                "age": 35.0,
                "tenure": 24,
                "monthly_charges": 65.5,
                "total_charges": 1572.0
            }
        }

class PredictionOutput(BaseModel):
    """예측 출력 스키마"""
    prediction: int
    probability: List[float]
    model_version: str

class BatchInput(BaseModel):
    """배치 입력 스키마"""
    instances: List[PredictionInput]

class BatchOutput(BaseModel):
    """배치 출력 스키마"""
    predictions: List[int]
    probabilities: List[List[float]]

# 모델 로드
model = None
scaler = None

@app.on_event("startup")
async def load_model():
    """애플리케이션 시작 시 모델 로드"""
    global model, scaler
    model = joblib.load("model.pkl")
    scaler = joblib.load("scaler.pkl")
    print("Model loaded successfully")

@app.get("/health")
async def health():
    """헬스 체크"""
    return {"status": "healthy", "model_loaded": model is not None}

@app.post("/predict", response_model=PredictionOutput)
async def predict(input_data: PredictionInput):
    """단일 예측"""
    if model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")

    # 입력을 DataFrame으로 변환
    df = pd.DataFrame([input_data.dict()])

    # 전처리 및 예측
    df_scaled = scaler.transform(df)
    prediction = int(model.predict(df_scaled)[0])
    probability = model.predict_proba(df_scaled)[0].tolist()

    return PredictionOutput(
        prediction=prediction,
        probability=probability,
        model_version="1.0.0"
    )

@app.post("/batch_predict", response_model=BatchOutput)
async def batch_predict(input_data: BatchInput):
    """배치 예측"""
    if model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")

    # DataFrame 변환
    instances = [item.dict() for item in input_data.instances]
    df = pd.DataFrame(instances)

    # 전처리 및 예측
    df_scaled = scaler.transform(df)
    predictions = model.predict(df_scaled).tolist()
    probabilities = model.predict_proba(df_scaled).tolist()

    return BatchOutput(
        predictions=predictions,
        probabilities=probabilities
    )

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

2.3 Docker 컨테이너화

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

# 의존성 설치
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 애플리케이션 코드 복사
COPY app.py .
COPY model.pkl .
COPY scaler.pkl .

# 포트 노출
EXPOSE 8000

# 실행
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
# requirements.txt
fastapi==0.104.0
uvicorn==0.24.0
pydantic==2.5.0
scikit-learn==1.3.0
pandas==2.1.0
joblib==1.3.0
numpy==1.24.0
# 빌드 및 실행
docker build -t ml-model-api:latest .
docker run -p 8000:8000 ml-model-api:latest

3. gRPC 서빙

3.1 Proto 정의

// prediction.proto
syntax = "proto3";

package prediction;

service PredictionService {
    rpc Predict (PredictRequest) returns (PredictResponse);
    rpc BatchPredict (BatchPredictRequest) returns (BatchPredictResponse);
    rpc HealthCheck (HealthRequest) returns (HealthResponse);
}

message PredictRequest {
    repeated float features = 1;
}

message PredictResponse {
    int32 prediction = 1;
    repeated float probabilities = 2;
    string model_version = 3;
}

message BatchPredictRequest {
    repeated PredictRequest instances = 1;
}

message BatchPredictResponse {
    repeated int32 predictions = 1;
    repeated Probabilities probabilities = 2;
}

message Probabilities {
    repeated float values = 1;
}

message HealthRequest {}

message HealthResponse {
    string status = 1;
    bool model_loaded = 2;
}

3.2 gRPC 서버 구현

"""
gRPC 모델 서버
"""

import grpc
from concurrent import futures
import joblib
import numpy as np

# Proto에서 생성된 코드
import prediction_pb2
import prediction_pb2_grpc

class PredictionServicer(prediction_pb2_grpc.PredictionServiceServicer):
    """gRPC 서비스 구현"""

    def __init__(self):
        self.model = joblib.load("model.pkl")
        self.scaler = joblib.load("scaler.pkl")
        print("Model loaded")

    def Predict(self, request, context):
        """단일 예측"""
        features = np.array(request.features).reshape(1, -1)
        features_scaled = self.scaler.transform(features)

        prediction = int(self.model.predict(features_scaled)[0])
        probabilities = self.model.predict_proba(features_scaled)[0].tolist()

        return prediction_pb2.PredictResponse(
            prediction=prediction,
            probabilities=probabilities,
            model_version="1.0.0"
        )

    def BatchPredict(self, request, context):
        """배치 예측"""
        features = np.array([list(inst.features) for inst in request.instances])
        features_scaled = self.scaler.transform(features)

        predictions = self.model.predict(features_scaled).tolist()
        probabilities = self.model.predict_proba(features_scaled).tolist()

        prob_messages = [
            prediction_pb2.Probabilities(values=p)
            for p in probabilities
        ]

        return prediction_pb2.BatchPredictResponse(
            predictions=predictions,
            probabilities=prob_messages
        )

    def HealthCheck(self, request, context):
        """헬스 체크"""
        return prediction_pb2.HealthResponse(
            status="healthy",
            model_loaded=True
        )

def serve():
    """gRPC 서버 시작"""
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    prediction_pb2_grpc.add_PredictionServiceServicer_to_server(
        PredictionServicer(), server
    )
    server.add_insecure_port("[::]:50051")
    server.start()
    print("gRPC server started on port 50051")
    server.wait_for_termination()

if __name__ == "__main__":
    serve()

3.3 gRPC 클라이언트

"""
gRPC 클라이언트
"""

import grpc
import prediction_pb2
import prediction_pb2_grpc

def predict_single(features: list):
    """단일 예측 요청"""
    with grpc.insecure_channel("localhost:50051") as channel:
        stub = prediction_pb2_grpc.PredictionServiceStub(channel)

        request = prediction_pb2.PredictRequest(features=features)
        response = stub.Predict(request)

        return {
            "prediction": response.prediction,
            "probabilities": list(response.probabilities),
            "model_version": response.model_version
        }

def predict_batch(instances: list):
    """배치 예측 요청"""
    with grpc.insecure_channel("localhost:50051") as channel:
        stub = prediction_pb2_grpc.PredictionServiceStub(channel)

        request = prediction_pb2.BatchPredictRequest(
            instances=[
                prediction_pb2.PredictRequest(features=inst)
                for inst in instances
            ]
        )
        response = stub.BatchPredict(request)

        return {
            "predictions": list(response.predictions),
            "probabilities": [list(p.values) for p in response.probabilities]
        }

# 사용 예시
result = predict_single([35.0, 24, 65.5, 1572.0])
print(result)

batch_result = predict_batch([
    [35.0, 24, 65.5, 1572.0],
    [45.0, 36, 85.0, 3060.0]
])
print(batch_result)

3.4 REST vs gRPC 비교

"""
REST vs gRPC 비교
"""

comparison = {
    "REST/HTTP": {
        "protocol": "HTTP/1.1 or HTTP/2",
        "data_format": "JSON (텍스트)",
        "schema": "선택적 (OpenAPI)",
        "streaming": "제한적",
        "browser_support": "네이티브",
        "use_cases": "일반 웹 API, 브라우저 클라이언트"
    },
    "gRPC": {
        "protocol": "HTTP/2",
        "data_format": "Protocol Buffers (바이너리)",
        "schema": "필수 (.proto)",
        "streaming": "양방향 지원",
        "browser_support": "gRPC-Web 필요",
        "use_cases": "마이크로서비스, 낮은 지연시간 필요"
    }
}

# 성능 비교 (일반적 기준)
performance = {
    "latency": {
        "REST": "~50-100ms",
        "gRPC": "~10-30ms"
    },
    "throughput": {
        "REST": "~1000 req/s",
        "gRPC": "~5000 req/s"
    },
    "payload_size": {
        "REST/JSON": "100 bytes",
        "gRPC/Protobuf": "~50 bytes"
    }
}

4. 배치 추론 vs 실시간 추론

4.1 배치 추론

"""
배치 추론 파이프라인
"""

import pandas as pd
import joblib
from datetime import datetime
import pyarrow.parquet as pq

class BatchInference:
    """배치 추론 클래스"""

    def __init__(self, model_path: str, scaler_path: str):
        self.model = joblib.load(model_path)
        self.scaler = joblib.load(scaler_path)

    def run_batch(
        self,
        input_path: str,
        output_path: str,
        batch_size: int = 10000
    ):
        """배치 추론 실행"""
        # 대용량 데이터 청크 단위 읽기
        reader = pq.ParquetFile(input_path)

        results = []

        for batch in reader.iter_batches(batch_size=batch_size):
            df = batch.to_pandas()

            # 피처 추출
            feature_columns = ["age", "tenure", "monthly_charges", "total_charges"]
            features = df[feature_columns]

            # 전처리 및 예측
            features_scaled = self.scaler.transform(features)
            predictions = self.model.predict(features_scaled)
            probabilities = self.model.predict_proba(features_scaled)[:, 1]

            # 결과 추가
            df["prediction"] = predictions
            df["probability"] = probabilities
            df["predicted_at"] = datetime.now()

            results.append(df)

        # 결과 저장
        final_df = pd.concat(results, ignore_index=True)
        final_df.to_parquet(output_path, index=False)

        return len(final_df)

# 스케줄러와 함께 사용 (예: Airflow)
# batch_inference = BatchInference("model.pkl", "scaler.pkl")
# count = batch_inference.run_batch(
#     "s3://bucket/daily_data.parquet",
#     "s3://bucket/predictions.parquet"
# )

4.2 실시간 추론 최적화

"""
실시간 추론 최적화
"""

import numpy as np
from typing import List
from collections import deque
import asyncio
import time

class OptimizedInference:
    """최적화된 실시간 추론"""

    def __init__(self, model, batch_size: int = 32, max_wait_ms: int = 10):
        self.model = model
        self.batch_size = batch_size
        self.max_wait_ms = max_wait_ms
        self.request_queue = deque()
        self.result_futures = {}

    async def predict(self, features: List[float]) -> dict:
        """단일 예측 (배칭 포함)"""
        request_id = id(features)
        future = asyncio.Future()
        self.result_futures[request_id] = future

        self.request_queue.append((request_id, features))

        # 배치 처리 트리거
        if len(self.request_queue) >= self.batch_size:
            await self._process_batch()
        else:
            # 타임아웃 후 처리
            asyncio.create_task(self._delayed_process())

        return await future

    async def _delayed_process(self):
        """지연 처리"""
        await asyncio.sleep(self.max_wait_ms / 1000)
        if self.request_queue:
            await self._process_batch()

    async def _process_batch(self):
        """배치 처리"""
        if not self.request_queue:
            return

        # 큐에서 요청 추출
        batch = []
        request_ids = []

        while self.request_queue and len(batch) < self.batch_size:
            req_id, features = self.request_queue.popleft()
            batch.append(features)
            request_ids.append(req_id)

        # 배치 예측
        batch_array = np.array(batch)
        predictions = self.model.predict(batch_array)

        # 결과 반환
        for req_id, pred in zip(request_ids, predictions):
            if req_id in self.result_futures:
                self.result_futures[req_id].set_result({"prediction": int(pred)})
                del self.result_futures[req_id]

4.3 모델 캐싱

"""
모델 및 결과 캐싱
"""

from functools import lru_cache
import hashlib
import redis
import json
import pickle

class CachedInference:
    """캐시가 포함된 추론"""

    def __init__(self, model, redis_client: redis.Redis):
        self.model = model
        self.redis = redis_client
        self.cache_ttl = 3600  # 1시간

    def _get_cache_key(self, features: tuple) -> str:
        """캐시 키 생성"""
        features_str = json.dumps(features, sort_keys=True)
        return f"pred:{hashlib.md5(features_str.encode()).hexdigest()}"

    def predict_with_cache(self, features: list) -> dict:
        """캐시된 예측"""
        cache_key = self._get_cache_key(tuple(features))

        # 캐시 확인
        cached = self.redis.get(cache_key)
        if cached:
            return json.loads(cached)

        # 예측 수행
        prediction = int(self.model.predict([features])[0])
        probability = self.model.predict_proba([features])[0].tolist()

        result = {
            "prediction": prediction,
            "probability": probability
        }

        # 캐시 저장
        self.redis.setex(
            cache_key,
            self.cache_ttl,
            json.dumps(result)
        )

        return result

# 인메모리 LRU 캐시 (간단한 경우)
@lru_cache(maxsize=10000)
def predict_cached(features_tuple: tuple) -> dict:
    """LRU 캐시된 예측"""
    features = list(features_tuple)
    prediction = model.predict([features])[0]
    return {"prediction": int(prediction)}

5. 서빙 인프라

5.1 Kubernetes 배포

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-model-api
  labels:
    app: ml-model
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ml-model
  template:
    metadata:
      labels:
        app: ml-model
    spec:
      containers:
        - name: api
          image: ml-model-api:latest
          ports:
            - containerPort: 8000
          resources:
            requests:
              memory: "512Mi"
              cpu: "250m"
            limits:
              memory: "1Gi"
              cpu: "500m"
          livenessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 30
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 5
            periodSeconds: 5
          env:
            - name: MODEL_PATH
              value: "/models/model.pkl"
          volumeMounts:
            - name: model-volume
              mountPath: /models
      volumes:
        - name: model-volume
          persistentVolumeClaim:
            claimName: model-pvc
---
apiVersion: v1
kind: Service
metadata:
  name: ml-model-service
spec:
  selector:
    app: ml-model
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8000
  type: LoadBalancer
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ml-model-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ml-model-api
  minReplicas: 2
  maxReplicas: 10
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70

5.2 로드 밸런싱

"""
서빙 로드 밸런싱 전략
"""

load_balancing_strategies = {
    "round_robin": {
        "description": "요청을 순차적으로 분배",
        "use_case": "균일한 요청 처리 시간"
    },
    "least_connections": {
        "description": "연결이 가장 적은 서버로 분배",
        "use_case": "요청 처리 시간이 다양한 경우"
    },
    "ip_hash": {
        "description": "클라이언트 IP 기반 고정 서버",
        "use_case": "세션 유지 필요"
    },
    "weighted": {
        "description": "서버 용량에 따른 가중치 분배",
        "use_case": "서버 스펙이 다른 경우"
    }
}

6. 모니터링 및 로깅

"""
서빙 모니터링
"""

from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time

# Prometheus 메트릭 정의
PREDICTION_COUNT = Counter(
    "prediction_total",
    "Total predictions",
    ["model_version", "status"]
)

PREDICTION_LATENCY = Histogram(
    "prediction_latency_seconds",
    "Prediction latency",
    buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)

MODEL_LOAD_TIME = Gauge(
    "model_load_time_seconds",
    "Time to load model"
)

# 예측 함수에 메트릭 추가
def predict_with_metrics(features):
    start_time = time.time()

    try:
        result = model.predict([features])
        PREDICTION_COUNT.labels(
            model_version="1.0.0",
            status="success"
        ).inc()
        return result

    except Exception as e:
        PREDICTION_COUNT.labels(
            model_version="1.0.0",
            status="error"
        ).inc()
        raise

    finally:
        latency = time.time() - start_time
        PREDICTION_LATENCY.observe(latency)

# Prometheus 서버 시작
start_http_server(9090)

연습 문제

문제 1: FastAPI 서빙

scikit-learn 모델을 FastAPI로 서빙하는 완전한 API를 작성하세요.

문제 2: Docker 컨테이너화

작성한 API를 Docker 이미지로 빌드하고 실행하세요.

문제 3: 성능 테스트

locust 또는 wrk를 사용하여 API 성능을 측정하세요.


요약

방식 장점 단점 사용 사례
REST API 간단, 범용 상대적 고지연 일반 웹 서비스
gRPC 저지연, 고처리량 복잡성 마이크로서비스
배치 추론 효율적, 비용 절감 실시간 불가 대량 데이터 처리
실시간 추론 즉시 응답 인프라 비용 사기 탐지, 추천

참고 자료

to navigate between lessons