Kubeflow Pipelines

Kubeflow Pipelines

1. Kubeflow κ°œμš”

KubeflowλŠ” Kubernetes μœ„μ—μ„œ ML μ›Œν¬ν”Œλ‘œμš°λ₯Ό ꡬ좕, 배포, κ΄€λ¦¬ν•˜κΈ° μœ„ν•œ μ˜€ν”ˆμ†ŒμŠ€ ν”Œλž«νΌμž…λ‹ˆλ‹€.

1.1 Kubeflow μ»΄ν¬λ„ŒνŠΈ

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                      Kubeflow μƒνƒœκ³„                                 β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                     β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚
β”‚   β”‚  Pipelines  β”‚    β”‚   Katib     β”‚    β”‚  Training   β”‚            β”‚
β”‚   β”‚             β”‚    β”‚             β”‚    β”‚  Operators  β”‚            β”‚
β”‚   β”‚ - μ›Œν¬ν”Œλ‘œμš° β”‚    β”‚ - AutoML    β”‚    β”‚             β”‚            β”‚
β”‚   β”‚ - νŒŒμ΄ν”„λΌμΈ β”‚    β”‚ - ν•˜μ΄νΌ    β”‚    β”‚ - TFJob     β”‚            β”‚
β”‚   β”‚   μ˜€μΌ€μŠ€νŠΈ   β”‚    β”‚   νŒŒλΌλ―Έν„°  β”‚    β”‚ - PyTorchJobβ”‚            β”‚
β”‚   β”‚   λ ˆμ΄μ…˜    β”‚    β”‚   νŠœλ‹      β”‚    β”‚ - MXJob     β”‚            β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β”‚
β”‚                                                                     β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚
β”‚   β”‚   KServe    β”‚    β”‚  Notebooks  β”‚    β”‚   Central   β”‚            β”‚
β”‚   β”‚             β”‚    β”‚             β”‚    β”‚  Dashboard  β”‚            β”‚
β”‚   β”‚ - λͺ¨λΈ μ„œλΉ™  β”‚    β”‚ - Jupyter   β”‚    β”‚             β”‚            β”‚
β”‚   β”‚ - A/B ν…ŒμŠ€νŠΈβ”‚    β”‚   ν™˜κ²½      β”‚    β”‚ - UI       β”‚            β”‚
β”‚   β”‚ - μΉ΄λ‚˜λ¦¬    β”‚    β”‚             β”‚    β”‚ - 관리      β”‚            β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β”‚
β”‚                                                                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1.2 μ„€μΉ˜

# KFP SDK μ„€μΉ˜
pip install kfp

# 버전 확인
python -c "import kfp; print(kfp.__version__)"

# Kubeflow Pipelines ν΄λŸ¬μŠ€ν„° μ„€μΉ˜ (minikube μ˜ˆμ‹œ)
# https://www.kubeflow.org/docs/started/installing-kubeflow/

2. Kubeflow Pipelines SDK

2.1 κΈ°λ³Έ κ°œλ…

"""
KFP κΈ°λ³Έ κ°œλ…
"""

# 핡심 μš©μ–΄
kfp_concepts = {
    "Pipeline": "ML μ›Œν¬ν”Œλ‘œμš°λ₯Ό μ •μ˜ν•˜λŠ” DAG (Directed Acyclic Graph)",
    "Component": "νŒŒμ΄ν”„λΌμΈμ˜ κ°œλ³„ 단계 (ν•¨μˆ˜ λ˜λŠ” μ»¨ν…Œμ΄λ„ˆ)",
    "Run": "νŒŒμ΄ν”„λΌμΈμ˜ ν•œ 번 μ‹€ν–‰",
    "Experiment": "κ΄€λ ¨ μ‹€ν–‰λ“€μ˜ κ·Έλ£Ή",
    "Artifact": "μ»΄ν¬λ„ŒνŠΈ κ°„ μ „λ‹¬λ˜λŠ” 데이터"
}

2.2 κ°„λ‹¨ν•œ νŒŒμ΄ν”„λΌμΈ

"""
KFP v2 κΈ°λ³Έ νŒŒμ΄ν”„λΌμΈ
"""

from kfp import dsl
from kfp import compiler

# μ»΄ν¬λ„ŒνŠΈ μ •μ˜
@dsl.component
def preprocess_data(input_path: str, output_path: dsl.OutputPath(str)):
    """데이터 μ „μ²˜λ¦¬ μ»΄ν¬λ„ŒνŠΈ"""
    import pandas as pd

    df = pd.read_csv(input_path)
    # μ „μ²˜λ¦¬ 둜직
    df_processed = df.dropna()
    df_processed.to_csv(output_path, index=False)

@dsl.component
def train_model(
    data_path: str,
    n_estimators: int,
    model_path: dsl.OutputPath(str)
):
    """λͺ¨λΈ ν•™μŠ΅ μ»΄ν¬λ„ŒνŠΈ"""
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    import joblib

    df = pd.read_csv(data_path)
    X = df.drop("target", axis=1)
    y = df["target"]

    model = RandomForestClassifier(n_estimators=n_estimators)
    model.fit(X, y)

    joblib.dump(model, model_path)

@dsl.component
def evaluate_model(
    model_path: str,
    test_data_path: str
) -> float:
    """λͺ¨λΈ 평가 μ»΄ν¬λ„ŒνŠΈ"""
    import pandas as pd
    from sklearn.metrics import accuracy_score
    import joblib

    model = joblib.load(model_path)
    df = pd.read_csv(test_data_path)
    X = df.drop("target", axis=1)
    y = df["target"]

    predictions = model.predict(X)
    accuracy = accuracy_score(y, predictions)

    return accuracy

# νŒŒμ΄ν”„λΌμΈ μ •μ˜
@dsl.pipeline(
    name="ML Training Pipeline",
    description="A simple ML training pipeline"
)
def ml_pipeline(
    input_data_path: str = "gs://bucket/data.csv",
    n_estimators: int = 100
):
    # 단계 1: 데이터 μ „μ²˜λ¦¬
    preprocess_task = preprocess_data(input_path=input_data_path)

    # 단계 2: λͺ¨λΈ ν•™μŠ΅
    train_task = train_model(
        data_path=preprocess_task.outputs["output_path"],
        n_estimators=n_estimators
    )

    # 단계 3: 평가
    evaluate_task = evaluate_model(
        model_path=train_task.outputs["model_path"],
        test_data_path=preprocess_task.outputs["output_path"]
    )

# νŒŒμ΄ν”„λΌμΈ 컴파일
compiler.Compiler().compile(
    pipeline_func=ml_pipeline,
    package_path="ml_pipeline.yaml"
)

3. μ»΄ν¬λ„ŒνŠΈ μž‘μ„±

3.1 Python ν•¨μˆ˜ μ»΄ν¬λ„ŒνŠΈ

"""
Python ν•¨μˆ˜ 기반 μ»΄ν¬λ„ŒνŠΈ
"""

from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics

# κΈ°λ³Έ μ»΄ν¬λ„ŒνŠΈ
@dsl.component(
    base_image="python:3.9-slim",
    packages_to_install=["pandas", "scikit-learn"]
)
def train_sklearn_model(
    training_data: Input[Dataset],
    model: Output[Model],
    metrics: Output[Metrics],
    n_estimators: int = 100,
    max_depth: int = 5
):
    """scikit-learn λͺ¨λΈ ν•™μŠ΅"""
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import cross_val_score
    import joblib
    import json

    # 데이터 λ‘œλ“œ
    df = pd.read_csv(training_data.path)
    X = df.drop("target", axis=1)
    y = df["target"]

    # λͺ¨λΈ ν•™μŠ΅
    clf = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        random_state=42
    )
    clf.fit(X, y)

    # ꡐ차 검증
    cv_scores = cross_val_score(clf, X, y, cv=5)

    # λͺ¨λΈ μ €μž₯
    joblib.dump(clf, model.path)

    # λ©”νŠΈλ¦­ λ‘œκΉ…
    metrics.log_metric("cv_mean", float(cv_scores.mean()))
    metrics.log_metric("cv_std", float(cv_scores.std()))
    metrics.log_metric("n_features", int(X.shape[1]))

# GPU μ‚¬μš© μ»΄ν¬λ„ŒνŠΈ
@dsl.component(
    base_image="pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime",
    packages_to_install=["transformers"]
)
def train_pytorch_model(
    data: Input[Dataset],
    model: Output[Model],
    epochs: int = 10,
    learning_rate: float = 0.001
):
    """PyTorch λͺ¨λΈ ν•™μŠ΅ (GPU)"""
    import torch
    # GPU ν•™μŠ΅ μ½”λ“œ...
    pass

3.2 μ»¨ν…Œμ΄λ„ˆ 기반 μ»΄ν¬λ„ŒνŠΈ

"""
Docker μ»¨ν…Œμ΄λ„ˆ 기반 μ»΄ν¬λ„ŒνŠΈ
"""

from kfp import dsl
from kfp.dsl import ContainerSpec

# 방법 1: container_spec 직접 μ •μ˜
@dsl.container_component
def custom_training_component(
    data_path: str,
    output_path: dsl.OutputPath(str),
    epochs: int
):
    return ContainerSpec(
        image="gcr.io/my-project/training-image:latest",
        command=["python", "train.py"],
        args=[
            "--data-path", data_path,
            "--output-path", output_path,
            "--epochs", str(epochs)
        ]
    )

# 방법 2: YAML μ»΄ν¬λ„ŒνŠΈ μ •μ˜
component_yaml = """
name: Training Component
description: Custom training component
inputs:
  - name: data_path
    type: String
  - name: epochs
    type: Integer
    default: '10'
outputs:
  - name: model_path
    type: String
implementation:
  container:
    image: gcr.io/my-project/training:latest
    command:
      - python
      - train.py
    args:
      - --data-path
      - {inputValue: data_path}
      - --epochs
      - {inputValue: epochs}
      - --output-path
      - {outputPath: model_path}
"""

# YAMLμ—μ„œ μ»΄ν¬λ„ŒνŠΈ λ‘œλ“œ
from kfp.components import load_component_from_text
training_op = load_component_from_text(component_yaml)

3.3 μž¬μ‚¬μš© κ°€λŠ₯ν•œ μ»΄ν¬λ„ŒνŠΈ

"""
μž¬μ‚¬μš© κ°€λŠ₯ν•œ μ»΄ν¬λ„ŒνŠΈ 라이브러리
"""

from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model

# components/data_processing.py
@dsl.component(packages_to_install=["pandas", "numpy"])
def load_and_split_data(
    input_path: str,
    train_data: Output[Dataset],
    test_data: Output[Dataset],
    test_size: float = 0.2,
    random_state: int = 42
):
    """데이터 λ‘œλ“œ 및 λΆ„ν• """
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_csv(input_path)
    train_df, test_df = train_test_split(
        df, test_size=test_size, random_state=random_state
    )

    train_df.to_csv(train_data.path, index=False)
    test_df.to_csv(test_data.path, index=False)

@dsl.component(packages_to_install=["pandas", "scikit-learn"])
def feature_engineering(
    input_data: Input[Dataset],
    output_data: Output[Dataset],
    numerical_features: list,
    categorical_features: list
):
    """ν”Όμ²˜ μ—”μ§€λ‹ˆμ–΄λ§"""
    import pandas as pd
    from sklearn.preprocessing import StandardScaler, LabelEncoder

    df = pd.read_csv(input_data.path)

    # μˆ˜μΉ˜ν˜• μŠ€μΌ€μΌλ§
    scaler = StandardScaler()
    df[numerical_features] = scaler.fit_transform(df[numerical_features])

    # λ²”μ£Όν˜• 인코딩
    for col in categorical_features:
        le = LabelEncoder()
        df[col] = le.fit_transform(df[col])

    df.to_csv(output_data.path, index=False)

4. νŒŒμ΄ν”„λΌμΈ κ³ κΈ‰ κΈ°λŠ₯

4.1 쑰건뢀 μ‹€ν–‰

"""
쑰건뢀 μ‹€ν–‰κ³Ό λΆ„κΈ°
"""

from kfp import dsl

@dsl.component
def check_data_quality(data_path: str) -> bool:
    """데이터 ν’ˆμ§ˆ 검사"""
    import pandas as pd
    df = pd.read_csv(data_path)
    # ν’ˆμ§ˆ 검사 둜직
    return df.isnull().sum().sum() == 0

@dsl.component
def clean_data(data_path: str, output_path: dsl.OutputPath(str)):
    """데이터 μ •μ œ"""
    import pandas as pd
    df = pd.read_csv(data_path)
    df = df.dropna()
    df.to_csv(output_path, index=False)

@dsl.component
def train_model(data_path: str):
    """λͺ¨λΈ ν•™μŠ΅"""
    pass

@dsl.pipeline(name="conditional-pipeline")
def conditional_pipeline(data_path: str):
    # 데이터 ν’ˆμ§ˆ 검사
    quality_check = check_data_quality(data_path=data_path)

    # 쑰건뢀 μ‹€ν–‰
    with dsl.Condition(quality_check.output == False, name="need-cleaning"):
        clean_task = clean_data(data_path=data_path)
        train_model(data_path=clean_task.outputs["output_path"])

    with dsl.Condition(quality_check.output == True, name="no-cleaning"):
        train_model(data_path=data_path)

4.2 반볡 μ‹€ν–‰

"""
ParallelForλ₯Ό μ‚¬μš©ν•œ 반볡 μ‹€ν–‰
"""

from kfp import dsl
from typing import List

@dsl.component
def train_with_params(
    data_path: str,
    params: dict
) -> float:
    """νŒŒλΌλ―Έν„°λ‘œ λͺ¨λΈ ν•™μŠ΅"""
    # ν•™μŠ΅ 둜직
    return 0.95

@dsl.component
def select_best_model(
    accuracies: List[float],
    param_sets: List[dict]
) -> dict:
    """졜고 μ„±λŠ₯ λͺ¨λΈ 선택"""
    best_idx = accuracies.index(max(accuracies))
    return param_sets[best_idx]

@dsl.pipeline(name="hyperparameter-search")
def hyperparameter_search_pipeline(data_path: str):
    # ν•˜μ΄νΌνŒŒλΌλ―Έν„° μ‘°ν•©
    param_sets = [
        {"n_estimators": 50, "max_depth": 3},
        {"n_estimators": 100, "max_depth": 5},
        {"n_estimators": 200, "max_depth": 10}
    ]

    # 병렬 ν•™μŠ΅
    with dsl.ParallelFor(param_sets) as params:
        train_task = train_with_params(
            data_path=data_path,
            params=params
        )

    # κ²°κ³Ό μˆ˜μ§‘ (ParallelFor λ°–μ—μ„œ)
    # select_best_model(...)

4.3 λ¦¬μ†ŒμŠ€ μ„€μ •

"""
μ»΄ν¬λ„ŒνŠΈ λ¦¬μ†ŒμŠ€ μ„€μ •
"""

from kfp import dsl
from kfp import kubernetes

@dsl.component
def gpu_training(data_path: str):
    """GPU ν•™μŠ΅"""
    pass

@dsl.pipeline(name="resource-pipeline")
def resource_pipeline(data_path: str):
    # GPU ν•™μŠ΅ νƒœμŠ€ν¬
    train_task = gpu_training(data_path=data_path)

    # λ¦¬μ†ŒμŠ€ μ„€μ •
    train_task.set_cpu_limit("4")
    train_task.set_memory_limit("16Gi")
    train_task.set_cpu_request("2")
    train_task.set_memory_request("8Gi")

    # GPU μ„€μ •
    kubernetes.add_node_selector(
        train_task,
        label_key="cloud.google.com/gke-accelerator",
        label_value="nvidia-tesla-t4"
    )
    train_task.set_accelerator_type("nvidia.com/gpu")
    train_task.set_accelerator_limit(1)

    # ν™˜κ²½ λ³€μˆ˜
    train_task.set_env_variable("CUDA_VISIBLE_DEVICES", "0")

    # λ³Όλ₯¨ 마운트
    kubernetes.mount_pvc(
        train_task,
        pvc_name="data-pvc",
        mount_path="/data"
    )

5. Kubernetes 톡합

5.1 μ‹œν¬λ¦Ώ 및 ConfigMap

"""
Kubernetes λ¦¬μ†ŒμŠ€ 연동
"""

from kfp import dsl
from kfp import kubernetes

@dsl.component
def component_with_secrets():
    """μ‹œν¬λ¦Ώμ„ μ‚¬μš©ν•˜λŠ” μ»΄ν¬λ„ŒνŠΈ"""
    import os
    api_key = os.environ.get("API_KEY")
    # ...

@dsl.pipeline(name="k8s-resources-pipeline")
def k8s_pipeline():
    task = component_with_secrets()

    # μ‹œν¬λ¦Ώμ—μ„œ ν™˜κ²½ λ³€μˆ˜
    kubernetes.use_secret_as_env(
        task,
        secret_name="api-credentials",
        secret_key_to_env={"api-key": "API_KEY"}
    )

    # ConfigMapμ—μ„œ ν™˜κ²½ λ³€μˆ˜
    kubernetes.use_config_map_as_env(
        task,
        config_map_name="app-config",
        config_map_key_to_env={"setting": "APP_SETTING"}
    )

    # μ‹œν¬λ¦Ώμ„ 파일둜 마운트
    kubernetes.use_secret_as_volume(
        task,
        secret_name="tls-certs",
        mount_path="/certs"
    )

5.2 μ„œλΉ„μŠ€ μ–΄μΉ΄μš΄νŠΈ

"""
μ„œλΉ„μŠ€ μ–΄μΉ΄μš΄νŠΈ μ„€μ •
"""

from kfp import dsl
from kfp import kubernetes

@dsl.pipeline(name="sa-pipeline")
def service_account_pipeline():
    task = some_component()

    # νŠΉμ • μ„œλΉ„μŠ€ μ–΄μΉ΄μš΄νŠΈ μ‚¬μš©
    kubernetes.set_service_account(
        task,
        service_account="ml-pipeline-sa"
    )

    # 이미지 ν’€ μ‹œν¬λ¦Ώ
    kubernetes.add_image_pull_secret(
        task,
        secret_name="docker-registry-secret"
    )

6. νŒŒμ΄ν”„λΌμΈ μ‹€ν–‰

6.1 SDK둜 μ‹€ν–‰

"""
KFP SDK둜 νŒŒμ΄ν”„λΌμΈ μ‹€ν–‰
"""

from kfp import compiler
from kfp.client import Client

# νŒŒμ΄ν”„λΌμΈ 컴파일
compiler.Compiler().compile(
    pipeline_func=ml_pipeline,
    package_path="pipeline.yaml"
)

# KFP ν΄λΌμ΄μ–ΈνŠΈ 생성
client = Client(host="http://kubeflow-pipelines-api:8888")

# μ‹€ν—˜ 생성 λ˜λŠ” κ°€μ Έμ˜€κΈ°
experiment = client.create_experiment(
    name="my-experiment",
    description="ML training experiments"
)

# νŒŒμ΄ν”„λΌμΈ μ‹€ν–‰
run = client.create_run_from_pipeline_package(
    pipeline_file="pipeline.yaml",
    experiment_id=experiment.experiment_id,
    run_name="training-run-001",
    arguments={
        "input_data_path": "gs://bucket/data.csv",
        "n_estimators": 200
    }
)

print(f"Run ID: {run.run_id}")
print(f"Run URL: {client.get_run(run.run_id).display_url}")

# μ‹€ν–‰ μ™„λ£Œ λŒ€κΈ°
client.wait_for_run_completion(run.run_id, timeout=3600)

# μ‹€ν–‰ μƒνƒœ 확인
run_details = client.get_run(run.run_id)
print(f"Status: {run_details.state}")

6.2 μŠ€μΌ€μ€„λ§

"""
νŒŒμ΄ν”„λΌμΈ μŠ€μΌ€μ€„λ§
"""

from kfp.client import Client

client = Client(host="http://kubeflow-pipelines-api:8888")

# 반볡 μ‹€ν–‰ (cron) 생성
recurring_run = client.create_recurring_run(
    experiment_id=experiment.experiment_id,
    job_name="daily-training",
    pipeline_package_path="pipeline.yaml",
    cron_expression="0 2 * * *",  # 맀일 μ˜€μ „ 2μ‹œ
    max_concurrency=1,
    arguments={
        "input_data_path": "gs://bucket/daily_data/"
    }
)

print(f"Recurring Run ID: {recurring_run.id}")

# 반볡 μ‹€ν–‰ λΉ„ν™œμ„±ν™”
client.disable_recurring_run(recurring_run.id)

# 반볡 μ‹€ν–‰ ν™œμ„±ν™”
client.enable_recurring_run(recurring_run.id)

6.3 μ‹€ν–‰ κ²°κ³Ό 쑰회

"""
μ‹€ν–‰ κ²°κ³Ό 및 μ•„ν‹°νŒ©νŠΈ 쑰회
"""

from kfp.client import Client

client = Client()

# νŠΉμ • μ‹€ν–‰ κ²°κ³Ό
run = client.get_run("run-id")
print(f"State: {run.state}")
print(f"Created: {run.created_at}")
print(f"Finished: {run.finished_at}")

# μ‹€ν–‰ λͺ©λ‘ 쑰회
runs = client.list_runs(
    experiment_id=experiment.experiment_id,
    page_size=10,
    sort_by="created_at desc"
)

for r in runs.runs:
    print(f"{r.name}: {r.state}")

# νŒŒμ΄ν”„λΌμΈ 좜λ ₯ 쑰회
# (μ•„ν‹°νŒ©νŠΈλŠ” 보톡 GCS, S3 등에 μ €μž₯됨)

7. μ‹€μ „ νŒŒμ΄ν”„λΌμΈ 예제

"""
μ™„μ „ν•œ ML νŒŒμ΄ν”„λΌμΈ 예제
"""

from kfp import dsl, compiler
from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(packages_to_install=["pandas", "scikit-learn"])
def ingest_data(
    source_path: str,
    output_data: Output[Dataset]
):
    """데이터 μˆ˜μ§‘"""
    import pandas as pd
    df = pd.read_csv(source_path)
    df.to_csv(output_data.path, index=False)

@dsl.component(packages_to_install=["pandas", "great-expectations"])
def validate_data(
    input_data: Input[Dataset],
    validation_report: Output[Dataset]
) -> bool:
    """데이터 검증"""
    import pandas as pd
    df = pd.read_csv(input_data.path)

    # 검증 둜직
    is_valid = (
        len(df) > 100 and
        df.isnull().sum().sum() / df.size < 0.1
    )

    # 리포트 μ €μž₯
    report = {"valid": is_valid, "rows": len(df)}
    pd.DataFrame([report]).to_csv(validation_report.path, index=False)

    return is_valid

@dsl.component(packages_to_install=["pandas", "scikit-learn", "joblib"])
def train_and_evaluate(
    train_data: Input[Dataset],
    model: Output[Model],
    metrics: Output[Metrics],
    n_estimators: int = 100
) -> float:
    """λͺ¨λΈ ν•™μŠ΅ 및 평가"""
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score, f1_score
    import joblib

    df = pd.read_csv(train_data.path)
    X = df.drop("target", axis=1)
    y = df["target"]

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    clf = RandomForestClassifier(n_estimators=n_estimators, random_state=42)
    clf.fit(X_train, y_train)

    y_pred = clf.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='macro')

    joblib.dump(clf, model.path)

    metrics.log_metric("accuracy", float(accuracy))
    metrics.log_metric("f1_score", float(f1))

    return accuracy

@dsl.component
def deploy_model(
    model: Input[Model],
    accuracy: float,
    min_accuracy: float = 0.8
) -> str:
    """λͺ¨λΈ 배포"""
    if accuracy < min_accuracy:
        return "Model not deployed: accuracy below threshold"

    # 배포 둜직 (예: λͺ¨λΈμ„ μ„œλΉ™ 인프라에 배포)
    return f"Model deployed with accuracy {accuracy}"

@dsl.pipeline(
    name="End-to-End ML Pipeline",
    description="Complete ML pipeline with data validation, training, and deployment"
)
def e2e_ml_pipeline(
    data_source: str = "gs://bucket/data.csv",
    n_estimators: int = 100,
    min_accuracy: float = 0.8
):
    # 1. 데이터 μˆ˜μ§‘
    ingest_task = ingest_data(source_path=data_source)

    # 2. 데이터 검증
    validate_task = validate_data(
        input_data=ingest_task.outputs["output_data"]
    )

    # 3. 검증 톡과 μ‹œ ν•™μŠ΅
    with dsl.Condition(validate_task.output == True, name="data-valid"):
        train_task = train_and_evaluate(
            train_data=ingest_task.outputs["output_data"],
            n_estimators=n_estimators
        )

        # 4. 배포
        deploy_model(
            model=train_task.outputs["model"],
            accuracy=train_task.output,
            min_accuracy=min_accuracy
        )

# 컴파일
compiler.Compiler().compile(
    pipeline_func=e2e_ml_pipeline,
    package_path="e2e_pipeline.yaml"
)

μ—°μŠ΅ 문제

문제 1: κΈ°λ³Έ νŒŒμ΄ν”„λΌμΈ

3단계 νŒŒμ΄ν”„λΌμΈμ„ μž‘μ„±ν•˜μ„Έμš”: 데이터 λ‘œλ“œ -> μ „μ²˜λ¦¬ -> λͺ¨λΈ ν•™μŠ΅

문제 2: ν•˜μ΄νΌνŒŒλΌλ―Έν„° 검색

ParallelForλ₯Ό μ‚¬μš©ν•˜μ—¬ μ—¬λŸ¬ ν•˜μ΄νΌνŒŒλΌλ―Έν„° 쑰합을 λ³‘λ ¬λ‘œ μ‹€ν—˜ν•˜λŠ” νŒŒμ΄ν”„λΌμΈμ„ μž‘μ„±ν•˜μ„Έμš”.

문제 3: μŠ€μΌ€μ€„λ§

λ§€μ£Ό μ›”μš”μΌ μƒˆλ²½μ— μ‹€ν–‰λ˜λŠ” μž¬ν•™μŠ΅ νŒŒμ΄ν”„λΌμΈμ„ μ„€μ •ν•˜μ„Έμš”.


μš”μ•½

κ°œλ… μ„€λͺ…
Pipeline ML μ›Œν¬ν”Œλ‘œμš° DAG
Component νŒŒμ΄ν”„λΌμΈμ˜ κ°œλ³„ 단계
@dsl.component Python ν•¨μˆ˜ μ»΄ν¬λ„ŒνŠΈ
@dsl.pipeline νŒŒμ΄ν”„λΌμΈ μ •μ˜
dsl.Condition 쑰건뢀 μ‹€ν–‰
dsl.ParallelFor 병렬 반볡 μ‹€ν–‰

참고 자료

to navigate between lessons