파이프라인과 실무 (Pipeline & Practice)

파이프라인과 실무 (Pipeline & Practice)

개요

sklearn의 Pipeline과 ColumnTransformer를 사용하면 전처리와 모델링을 하나의 워크플로우로 통합할 수 있습니다. 모델 저장과 배포까지 포함한 실무 노하우를 다룹니다.


1. Pipeline 기초

1.1 Pipeline의 필요성

"""
Pipeline 없이 코드 작성 시 문제점:

1. 데이터 누수 (Data Leakage):
   - 테스트 데이터 정보가 학습에 반영
   - 예: 전체 데이터로 스케일링 후 분할

2. 코드 복잡성:
   - 여러 단계를 수동으로 관리
   - 실수 가능성 높음

3. 재현성 문제:
   - 순서 실수
   - 파라미터 불일치

Pipeline 장점:
1. 코드 간소화
2. 데이터 누수 방지
3. 교차 검증과 완벽 통합
4. 하이퍼파라미터 튜닝 용이
5. 모델 저장/배포 편리
"""

import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.datasets import load_iris

1.2 기본 Pipeline 생성

# 데이터 로드
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)

# Pipeline 생성 (명시적 이름)
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA(n_components=2)),
    ('classifier', LogisticRegression())
])

# 학습 및 예측
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
score = pipeline.score(X_test, y_test)

print(f"Pipeline 정확도: {score:.4f}")

# make_pipeline (자동 이름)
pipeline_auto = make_pipeline(
    StandardScaler(),
    PCA(n_components=2),
    LogisticRegression()
)

pipeline_auto.fit(X_train, y_train)
print(f"make_pipeline 정확도: {pipeline_auto.score(X_test, y_test):.4f}")

1.3 Pipeline 단계 접근

# 단계 이름 확인
print("Pipeline 단계:")
for name, step in pipeline.named_steps.items():
    print(f"  {name}: {type(step).__name__}")

# 특정 단계 접근
print(f"\nPCA 설명된 분산: {pipeline.named_steps['pca'].explained_variance_ratio_}")
print(f"로지스틱 회귀 계수 형상: {pipeline.named_steps['classifier'].coef_.shape}")

# 중간 단계 결과 얻기
X_scaled = pipeline.named_steps['scaler'].transform(X_test)
X_pca = pipeline.named_steps['pca'].transform(X_scaled)
print(f"\n스케일링 후 형상: {X_scaled.shape}")
print(f"PCA 후 형상: {X_pca.shape}")

2. ColumnTransformer

2.1 다양한 타입의 특성 처리

from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder

"""
ColumnTransformer:
- 서로 다른 타입의 특성에 다른 전처리 적용
- 수치형: 스케일링
- 범주형: 인코딩
"""

# 샘플 데이터
data = {
    'age': [25, 32, 47, 51, 62],
    'income': [50000, 60000, 80000, 120000, 95000],
    'gender': ['M', 'F', 'M', 'F', 'M'],
    'education': ['Bachelor', 'Master', 'PhD', 'Bachelor', 'Master'],
    'purchased': [0, 1, 1, 1, 0]
}
df = pd.DataFrame(data)

X = df.drop('purchased', axis=1)
y = df['purchased']

print("데이터 타입:")
print(X.dtypes)

2.2 ColumnTransformer 생성

# 특성 분류
numeric_features = ['age', 'income']
categorical_features = ['gender', 'education']

# ColumnTransformer 정의
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numeric_features),
        ('cat', OneHotEncoder(drop='first', sparse_output=False), categorical_features)
    ],
    remainder='passthrough'  # 나머지 특성 처리: 'drop', 'passthrough'
)

# 변환
X_transformed = preprocessor.fit_transform(X)

print(f"원본 형상: {X.shape}")
print(f"변환 후 형상: {X_transformed.shape}")

# 변환된 특성 이름
feature_names = (
    numeric_features +
    list(preprocessor.named_transformers_['cat'].get_feature_names_out(categorical_features))
)
print(f"특성 이름: {feature_names}")

2.3 Pipeline + ColumnTransformer

from sklearn.ensemble import RandomForestClassifier

# 전체 파이프라인
full_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])

# 학습 (작은 데이터이므로 전체 사용)
full_pipeline.fit(X, y)

# 예측
new_data = pd.DataFrame({
    'age': [30],
    'income': [70000],
    'gender': ['F'],
    'education': ['Master']
})
prediction = full_pipeline.predict(new_data)
print(f"예측: {prediction[0]}")

3. 복잡한 전처리 파이프라인

3.1 결측치 처리 포함

from sklearn.impute import SimpleImputer

# 결측치가 있는 데이터
data_missing = {
    'age': [25, np.nan, 47, 51, 62],
    'income': [50000, 60000, np.nan, 120000, 95000],
    'gender': ['M', 'F', 'M', None, 'M'],
    'education': ['Bachelor', 'Master', 'PhD', 'Bachelor', None],
    'purchased': [0, 1, 1, 1, 0]
}
df_missing = pd.DataFrame(data_missing)
X_missing = df_missing.drop('purchased', axis=1)
y_missing = df_missing['purchased']

# 수치형 파이프라인
numeric_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

# 범주형 파이프라인
categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(drop='first', sparse_output=False, handle_unknown='ignore'))
])

# ColumnTransformer
preprocessor_full = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ]
)

# 전체 파이프라인
complete_pipeline = Pipeline([
    ('preprocessor', preprocessor_full),
    ('classifier', RandomForestClassifier(random_state=42))
])

complete_pipeline.fit(X_missing, y_missing)
print("결측치 포함 파이프라인 학습 완료")

3.2 특성 선택 포함

from sklearn.feature_selection import SelectKBest, f_classif

# 특성 선택 포함 파이프라인
pipeline_with_selection = Pipeline([
    ('preprocessor', preprocessor_full),
    ('feature_selection', SelectKBest(score_func=f_classif, k='all')),
    ('classifier', RandomForestClassifier(random_state=42))
])

pipeline_with_selection.fit(X_missing, y_missing)
print("특성 선택 포함 파이프라인 학습 완료")

4. Pipeline과 교차 검증

4.1 올바른 교차 검증

from sklearn.model_selection import cross_val_score, GridSearchCV
from sklearn.datasets import load_breast_cancer

# 데이터 로드
cancer = load_breast_cancer()
X, y = cancer.data, cancer.target

# 파이프라인 정의
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])

# 교차 검증 (올바른 방법)
# 각 폴드에서 스케일러가 학습 데이터만으로 fit됨
scores = cross_val_score(pipeline, X, y, cv=5, scoring='accuracy')

print("교차 검증 결과:")
print(f"  각 폴드: {scores}")
print(f"  평균: {scores.mean():.4f} (+/- {scores.std():.4f})")

4.2 Pipeline 하이퍼파라미터 튜닝

# 파라미터 이름: step__parameter
param_grid = {
    'scaler': [StandardScaler(), MinMaxScaler()],
    'classifier__C': [0.1, 1, 10],
    'classifier__penalty': ['l1', 'l2'],
    'classifier__solver': ['liblinear']
}

# Grid Search
grid_search = GridSearchCV(
    pipeline,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1
)

grid_search.fit(X, y)

print("Grid Search 결과:")
print(f"  최적 파라미터: {grid_search.best_params_}")
print(f"  최적 점수: {grid_search.best_score_:.4f}")

4.3 복잡한 파라미터 그리드

from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC

# 여러 모델 비교 파이프라인
pipeline_multi = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())  # placeholder
])

# 모델별 다른 파라미터
param_grid_multi = [
    {
        'classifier': [LogisticRegression(max_iter=1000)],
        'classifier__C': [0.1, 1, 10]
    },
    {
        'classifier': [RandomForestClassifier(random_state=42)],
        'classifier__n_estimators': [50, 100],
        'classifier__max_depth': [None, 5, 10]
    },
    {
        'classifier': [SVC()],
        'classifier__C': [0.1, 1],
        'classifier__kernel': ['rbf', 'linear']
    }
]

grid_search_multi = GridSearchCV(
    pipeline_multi,
    param_grid_multi,
    cv=5,
    scoring='accuracy',
    n_jobs=-1
)

grid_search_multi.fit(X, y)

print("여러 모델 비교 결과:")
print(f"  최적 모델: {type(grid_search_multi.best_params_['classifier']).__name__}")
print(f"  최적 파라미터: {grid_search_multi.best_params_}")
print(f"  최적 점수: {grid_search_multi.best_score_:.4f}")

5. 모델 저장과 로드

5.1 joblib 사용

import joblib

# 최적 모델 학습
best_pipeline = grid_search.best_estimator_

# 모델 저장
joblib.dump(best_pipeline, 'best_model.joblib')
print("모델 저장 완료: best_model.joblib")

# 모델 로드
loaded_model = joblib.load('best_model.joblib')

# 테스트
X_test_sample = X[:5]
predictions = loaded_model.predict(X_test_sample)
print(f"로드된 모델 예측: {predictions}")

5.2 pickle 사용

import pickle

# pickle 저장
with open('model.pkl', 'wb') as f:
    pickle.dump(best_pipeline, f)

# pickle 로드
with open('model.pkl', 'rb') as f:
    loaded_model_pkl = pickle.load(f)

print("pickle 모델 예측:", loaded_model_pkl.predict(X[:3]))

5.3 버전 관리

import sklearn
from datetime import datetime

# 메타데이터와 함께 저장
model_metadata = {
    'model': best_pipeline,
    'sklearn_version': sklearn.__version__,
    'training_date': datetime.now().isoformat(),
    'feature_names': list(cancer.feature_names),
    'target_names': list(cancer.target_names),
    'cv_score': grid_search.best_score_
}

joblib.dump(model_metadata, 'model_with_metadata.joblib')

# 로드 및 검증
loaded_metadata = joblib.load('model_with_metadata.joblib')
print(f"학습 날짜: {loaded_metadata['training_date']}")
print(f"sklearn 버전: {loaded_metadata['sklearn_version']}")
print(f"CV 점수: {loaded_metadata['cv_score']:.4f}")

6. FunctionTransformer

6.1 커스텀 변환 함수

from sklearn.preprocessing import FunctionTransformer

# 커스텀 변환 함수
def log_transform(X):
    return np.log1p(X)  # log(1 + x)

def add_polynomial_features(X):
    return np.c_[X, X ** 2, X ** 3]

# FunctionTransformer 생성
log_transformer = FunctionTransformer(log_transform, validate=True)
poly_transformer = FunctionTransformer(add_polynomial_features, validate=True)

# 파이프라인에서 사용
pipeline_custom = Pipeline([
    ('log', log_transformer),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])

# 테스트
X_positive = np.abs(X) + 1  # 로그를 위해 양수로 변환
scores = cross_val_score(pipeline_custom, X_positive, y, cv=5)
print(f"커스텀 변환 파이프라인 CV 점수: {scores.mean():.4f}")

6.2 특성 추가 함수

# 도메인 특정 특성 추가
def create_ratio_features(X):
    """비율 특성 생성"""
    X = np.array(X)
    if X.shape[1] >= 2:
        ratio = (X[:, 0] / (X[:, 1] + 1e-10)).reshape(-1, 1)
        return np.c_[X, ratio]
    return X

ratio_transformer = FunctionTransformer(create_ratio_features)

# 파이프라인
pipeline_ratio = Pipeline([
    ('ratio_features', ratio_transformer),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])

scores = cross_val_score(pipeline_ratio, X, y, cv=5)
print(f"비율 특성 추가 CV 점수: {scores.mean():.4f}")

7. 커스텀 Transformer

from sklearn.base import BaseEstimator, TransformerMixin

class OutlierRemover(BaseEstimator, TransformerMixin):
    """이상치 제거 트랜스포머"""

    def __init__(self, threshold=3):
        self.threshold = threshold
        self.mean_ = None
        self.std_ = None

    def fit(self, X, y=None):
        self.mean_ = np.mean(X, axis=0)
        self.std_ = np.std(X, axis=0)
        return self

    def transform(self, X):
        X = np.array(X)
        z_scores = np.abs((X - self.mean_) / (self.std_ + 1e-10))
        # 이상치를 경계값으로 대체
        X_clipped = np.where(z_scores > self.threshold,
                             self.mean_ + self.threshold * self.std_ * np.sign(X - self.mean_),
                             X)
        return X_clipped


class FeatureSelector(BaseEstimator, TransformerMixin):
    """특성 선택 트랜스포머"""

    def __init__(self, feature_indices=None):
        self.feature_indices = feature_indices

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = np.array(X)
        if self.feature_indices is not None:
            return X[:, self.feature_indices]
        return X


# 커스텀 트랜스포머 사용
custom_pipeline = Pipeline([
    ('outlier', OutlierRemover(threshold=3)),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])

scores = cross_val_score(custom_pipeline, X, y, cv=5)
print(f"커스텀 트랜스포머 CV 점수: {scores.mean():.4f}")

8. 실전 전처리 템플릿

8.1 분류 문제 템플릿

from sklearn.compose import make_column_selector

def create_classification_pipeline(model, numeric_features=None, categorical_features=None):
    """분류 문제용 파이프라인 생성"""

    # 수치형 특성 파이프라인
    numeric_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])

    # 범주형 특성 파이프라인
    categorical_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('encoder', OneHotEncoder(drop='first', sparse_output=False, handle_unknown='ignore'))
    ])

    # ColumnTransformer
    if numeric_features is None and categorical_features is None:
        # 자동 감지
        preprocessor = ColumnTransformer(
            transformers=[
                ('num', numeric_transformer, make_column_selector(dtype_include=np.number)),
                ('cat', categorical_transformer, make_column_selector(dtype_include=object))
            ]
        )
    else:
        preprocessor = ColumnTransformer(
            transformers=[
                ('num', numeric_transformer, numeric_features or []),
                ('cat', categorical_transformer, categorical_features or [])
            ]
        )

    # 전체 파이프라인
    pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', model)
    ])

    return pipeline


# 사용 예시
from sklearn.ensemble import GradientBoostingClassifier

pipeline = create_classification_pipeline(
    GradientBoostingClassifier(random_state=42),
    numeric_features=['age', 'income'],
    categorical_features=['gender', 'education']
)

8.2 회귀 문제 템플릿

from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error, r2_score

def create_regression_pipeline(model, numeric_features=None, categorical_features=None):
    """회귀 문제용 파이프라인 생성"""

    numeric_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])

    categorical_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('encoder', OneHotEncoder(sparse_output=False, handle_unknown='ignore'))
    ])

    if numeric_features is None and categorical_features is None:
        preprocessor = ColumnTransformer(
            transformers=[
                ('num', numeric_transformer, make_column_selector(dtype_include=np.number)),
                ('cat', categorical_transformer, make_column_selector(dtype_include=object))
            ]
        )
    else:
        preprocessor = ColumnTransformer(
            transformers=[
                ('num', numeric_transformer, numeric_features or []),
                ('cat', categorical_transformer, categorical_features or [])
            ]
        )

    pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('regressor', model)
    ])

    return pipeline

9. 모델 배포 고려사항

9.1 예측 함수 래핑

class ModelWrapper:
    """배포용 모델 래퍼"""

    def __init__(self, model_path):
        self.model = joblib.load(model_path)
        self.feature_names = None

    def set_feature_names(self, names):
        self.feature_names = names

    def predict(self, input_data):
        """딕셔너리 또는 DataFrame 입력 처리"""
        if isinstance(input_data, dict):
            input_data = pd.DataFrame([input_data])

        if self.feature_names:
            input_data = input_data[self.feature_names]

        return self.model.predict(input_data)

    def predict_proba(self, input_data):
        if isinstance(input_data, dict):
            input_data = pd.DataFrame([input_data])

        if self.feature_names:
            input_data = input_data[self.feature_names]

        return self.model.predict_proba(input_data)


# 사용 예시
# wrapper = ModelWrapper('best_model.joblib')
# wrapper.set_feature_names(['age', 'income', 'gender', 'education'])
# prediction = wrapper.predict({'age': 30, 'income': 70000, 'gender': 'M', 'education': 'Bachelor'})

9.2 입력 검증

def validate_input(data, expected_columns, expected_dtypes=None):
    """입력 데이터 검증"""
    errors = []

    # 필수 컬럼 확인
    missing_cols = set(expected_columns) - set(data.columns)
    if missing_cols:
        errors.append(f"누락된 컬럼: {missing_cols}")

    # 데이터 타입 확인
    if expected_dtypes:
        for col, dtype in expected_dtypes.items():
            if col in data.columns and not np.issubdtype(data[col].dtype, dtype):
                errors.append(f"잘못된 타입 - {col}: {data[col].dtype} (기대: {dtype})")

    # 결측치 확인
    null_counts = data[expected_columns].isnull().sum()
    null_cols = null_counts[null_counts > 0]
    if len(null_cols) > 0:
        print(f"경고: 결측치 발견 - {dict(null_cols)}")

    if errors:
        raise ValueError("\n".join(errors))

    return True

10. 실전 체크리스트

"""
ML 프로젝트 체크리스트:

1. 데이터 준비
   [ ] 데이터 로드 및 기본 탐색
   [ ] 타겟 변수 정의
   [ ] 학습/검증/테스트 분할

2. 탐색적 데이터 분석 (EDA)
   [ ] 결측치 확인
   [ ] 이상치 확인
   [ ] 특성 분포 확인
   [ ] 타겟과의 상관관계

3. 전처리 파이프라인
   [ ] 수치형 특성 처리 (스케일링, 결측치)
   [ ] 범주형 특성 처리 (인코딩, 결측치)
   [ ] 특성 선택/생성

4. 모델링
   [ ] 기준선 모델 설정
   [ ] 여러 모델 비교
   [ ] 하이퍼파라미터 튜닝
   [ ] 교차 검증

5. 평가
   [ ] 적절한 평가 지표 선택
   [ ] 과적합/과소적합 확인
   [ ] 오차 분석

6. 배포
   [ ] 모델 저장
   [ ] 입력 검증
   [ ] 예측 함수 래핑
   [ ] 모니터링 계획
"""

연습 문제

문제 1: 기본 Pipeline

Iris 데이터에 스케일링 + PCA + 로지스틱 회귀 파이프라인을 만드세요.

from sklearn.datasets import load_iris
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score

iris = load_iris()
X, y = iris.data, iris.target

# 풀이
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA(n_components=2)),
    ('classifier', LogisticRegression())
])

scores = cross_val_score(pipeline, X, y, cv=5)
print(f"CV 점수: {scores.mean():.4f}")

문제 2: ColumnTransformer

수치형과 범주형 특성을 다르게 처리하는 파이프라인을 만드세요.

from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

# 샘플 데이터
data = pd.DataFrame({
    'age': [25, 30, 35, 40],
    'income': [50000, 60000, 70000, 80000],
    'city': ['A', 'B', 'A', 'C']
})

# 풀이
numeric_features = ['age', 'income']
categorical_features = ['city']

preprocessor = ColumnTransformer([
    ('num', StandardScaler(), numeric_features),
    ('cat', OneHotEncoder(), categorical_features)
])

X_transformed = preprocessor.fit_transform(data)
print(f"변환 후 형상: {X_transformed.shape}")

문제 3: 모델 저장 및 로드

학습된 파이프라인을 저장하고 로드하세요.

import joblib

# 학습
pipeline.fit(X, y)

# 저장
joblib.dump(pipeline, 'iris_pipeline.joblib')

# 로드
loaded_pipeline = joblib.load('iris_pipeline.joblib')

# 테스트
print(f"로드된 모델 정확도: {loaded_pipeline.score(X, y):.4f}")

요약

구성 요소 용도 예시
Pipeline 단계 순차 연결 스케일링 → PCA → 모델
ColumnTransformer 특성별 다른 처리 수치형/범주형 분리
FunctionTransformer 커스텀 함수 로그 변환
make_pipeline 자동 이름 지정 간단한 파이프라인

Pipeline 하이퍼파라미터 명명 규칙

step_name__parameter_name

예시:
- classifier__C: 분류기의 C 파라미터
- preprocessor__num__scaler__with_mean: 중첩된 파라미터

모델 저장 비교

방법 장점 단점
joblib 대용량 NumPy 효율적 sklearn 전용
pickle 표준 라이브러리 대용량 느림
ONNX 프레임워크 독립적 변환 필요

실무 팁

  1. 항상 Pipeline 사용하여 데이터 누수 방지
  2. ColumnTransformer로 전처리 명확하게 분리
  3. 모델 저장 시 메타데이터 포함
  4. 입력 검증 함수 작성
  5. 버전 관리 철저히
to navigate between lessons