Kubeflow Pipelines
Kubeflow Pipelines¶
1. Kubeflow κ°μ¶
Kubeflowλ Kubernetes μμμ ML μν¬νλ‘μ°λ₯Ό ꡬμΆ, λ°°ν¬, κ΄λ¦¬νκΈ° μν μ€νμμ€ νλ«νΌμ λλ€.
1.1 Kubeflow μ»΄ν¬λνΈ¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Kubeflow μνκ³ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Pipelines β β Katib β β Training β β
β β β β β β Operators β β
β β - μν¬νλ‘μ° β β - AutoML β β β β
β β - νμ΄νλΌμΈ β β - νμ΄νΌ β β - TFJob β β
β β μ€μΌμ€νΈ β β νλΌλ―Έν° β β - PyTorchJobβ β
β β λ μ΄μ
β β νλ β β - MXJob β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β KServe β β Notebooks β β Central β β
β β β β β β Dashboard β β
β β - λͺ¨λΈ μλΉ β β - Jupyter β β β β
β β - A/B ν
μ€νΈβ β νκ²½ β β - UI β β
β β - μΉ΄λ리 β β β β - κ΄λ¦¬ β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1.2 μ€μΉ¶
# KFP SDK μ€μΉ
pip install kfp
# λ²μ νμΈ
python -c "import kfp; print(kfp.__version__)"
# Kubeflow Pipelines ν΄λ¬μ€ν° μ€μΉ (minikube μμ)
# https://www.kubeflow.org/docs/started/installing-kubeflow/
2. Kubeflow Pipelines SDK¶
2.1 κΈ°λ³Έ κ°λ ¶
"""
KFP κΈ°λ³Έ κ°λ
"""
# ν΅μ¬ μ©μ΄
kfp_concepts = {
"Pipeline": "ML μν¬νλ‘μ°λ₯Ό μ μνλ DAG (Directed Acyclic Graph)",
"Component": "νμ΄νλΌμΈμ κ°λ³ λ¨κ³ (ν¨μ λλ 컨ν
μ΄λ)",
"Run": "νμ΄νλΌμΈμ ν λ² μ€ν",
"Experiment": "κ΄λ ¨ μ€νλ€μ κ·Έλ£Ή",
"Artifact": "μ»΄ν¬λνΈ κ° μ λ¬λλ λ°μ΄ν°"
}
2.2 κ°λ¨ν νμ΄νλΌμΈ¶
"""
KFP v2 κΈ°λ³Έ νμ΄νλΌμΈ
"""
from kfp import dsl
from kfp import compiler
# μ»΄ν¬λνΈ μ μ
@dsl.component
def preprocess_data(input_path: str, output_path: dsl.OutputPath(str)):
"""λ°μ΄ν° μ μ²λ¦¬ μ»΄ν¬λνΈ"""
import pandas as pd
df = pd.read_csv(input_path)
# μ μ²λ¦¬ λ‘μ§
df_processed = df.dropna()
df_processed.to_csv(output_path, index=False)
@dsl.component
def train_model(
data_path: str,
n_estimators: int,
model_path: dsl.OutputPath(str)
):
"""λͺ¨λΈ νμ΅ μ»΄ν¬λνΈ"""
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib
df = pd.read_csv(data_path)
X = df.drop("target", axis=1)
y = df["target"]
model = RandomForestClassifier(n_estimators=n_estimators)
model.fit(X, y)
joblib.dump(model, model_path)
@dsl.component
def evaluate_model(
model_path: str,
test_data_path: str
) -> float:
"""λͺ¨λΈ νκ° μ»΄ν¬λνΈ"""
import pandas as pd
from sklearn.metrics import accuracy_score
import joblib
model = joblib.load(model_path)
df = pd.read_csv(test_data_path)
X = df.drop("target", axis=1)
y = df["target"]
predictions = model.predict(X)
accuracy = accuracy_score(y, predictions)
return accuracy
# νμ΄νλΌμΈ μ μ
@dsl.pipeline(
name="ML Training Pipeline",
description="A simple ML training pipeline"
)
def ml_pipeline(
input_data_path: str = "gs://bucket/data.csv",
n_estimators: int = 100
):
# λ¨κ³ 1: λ°μ΄ν° μ μ²λ¦¬
preprocess_task = preprocess_data(input_path=input_data_path)
# λ¨κ³ 2: λͺ¨λΈ νμ΅
train_task = train_model(
data_path=preprocess_task.outputs["output_path"],
n_estimators=n_estimators
)
# λ¨κ³ 3: νκ°
evaluate_task = evaluate_model(
model_path=train_task.outputs["model_path"],
test_data_path=preprocess_task.outputs["output_path"]
)
# νμ΄νλΌμΈ μ»΄νμΌ
compiler.Compiler().compile(
pipeline_func=ml_pipeline,
package_path="ml_pipeline.yaml"
)
3. μ»΄ν¬λνΈ μμ±¶
3.1 Python ν¨μ μ»΄ν¬λνΈ¶
"""
Python ν¨μ κΈ°λ° μ»΄ν¬λνΈ
"""
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics
# κΈ°λ³Έ μ»΄ν¬λνΈ
@dsl.component(
base_image="python:3.9-slim",
packages_to_install=["pandas", "scikit-learn"]
)
def train_sklearn_model(
training_data: Input[Dataset],
model: Output[Model],
metrics: Output[Metrics],
n_estimators: int = 100,
max_depth: int = 5
):
"""scikit-learn λͺ¨λΈ νμ΅"""
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
import joblib
import json
# λ°μ΄ν° λ‘λ
df = pd.read_csv(training_data.path)
X = df.drop("target", axis=1)
y = df["target"]
# λͺ¨λΈ νμ΅
clf = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42
)
clf.fit(X, y)
# κ΅μ°¨ κ²μ¦
cv_scores = cross_val_score(clf, X, y, cv=5)
# λͺ¨λΈ μ μ₯
joblib.dump(clf, model.path)
# λ©νΈλ¦ λ‘κΉ
metrics.log_metric("cv_mean", float(cv_scores.mean()))
metrics.log_metric("cv_std", float(cv_scores.std()))
metrics.log_metric("n_features", int(X.shape[1]))
# GPU μ¬μ© μ»΄ν¬λνΈ
@dsl.component(
base_image="pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime",
packages_to_install=["transformers"]
)
def train_pytorch_model(
data: Input[Dataset],
model: Output[Model],
epochs: int = 10,
learning_rate: float = 0.001
):
"""PyTorch λͺ¨λΈ νμ΅ (GPU)"""
import torch
# GPU νμ΅ μ½λ...
pass
3.2 컨ν μ΄λ κΈ°λ° μ»΄ν¬λνΈ¶
"""
Docker 컨ν
μ΄λ κΈ°λ° μ»΄ν¬λνΈ
"""
from kfp import dsl
from kfp.dsl import ContainerSpec
# λ°©λ² 1: container_spec μ§μ μ μ
@dsl.container_component
def custom_training_component(
data_path: str,
output_path: dsl.OutputPath(str),
epochs: int
):
return ContainerSpec(
image="gcr.io/my-project/training-image:latest",
command=["python", "train.py"],
args=[
"--data-path", data_path,
"--output-path", output_path,
"--epochs", str(epochs)
]
)
# λ°©λ² 2: YAML μ»΄ν¬λνΈ μ μ
component_yaml = """
name: Training Component
description: Custom training component
inputs:
- name: data_path
type: String
- name: epochs
type: Integer
default: '10'
outputs:
- name: model_path
type: String
implementation:
container:
image: gcr.io/my-project/training:latest
command:
- python
- train.py
args:
- --data-path
- {inputValue: data_path}
- --epochs
- {inputValue: epochs}
- --output-path
- {outputPath: model_path}
"""
# YAMLμμ μ»΄ν¬λνΈ λ‘λ
from kfp.components import load_component_from_text
training_op = load_component_from_text(component_yaml)
3.3 μ¬μ¬μ© κ°λ₯ν μ»΄ν¬λνΈ¶
"""
μ¬μ¬μ© κ°λ₯ν μ»΄ν¬λνΈ λΌμ΄λΈλ¬λ¦¬
"""
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model
# components/data_processing.py
@dsl.component(packages_to_install=["pandas", "numpy"])
def load_and_split_data(
input_path: str,
train_data: Output[Dataset],
test_data: Output[Dataset],
test_size: float = 0.2,
random_state: int = 42
):
"""λ°μ΄ν° λ‘λ λ° λΆν """
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv(input_path)
train_df, test_df = train_test_split(
df, test_size=test_size, random_state=random_state
)
train_df.to_csv(train_data.path, index=False)
test_df.to_csv(test_data.path, index=False)
@dsl.component(packages_to_install=["pandas", "scikit-learn"])
def feature_engineering(
input_data: Input[Dataset],
output_data: Output[Dataset],
numerical_features: list,
categorical_features: list
):
"""νΌμ² μμ§λμ΄λ§"""
import pandas as pd
from sklearn.preprocessing import StandardScaler, LabelEncoder
df = pd.read_csv(input_data.path)
# μμΉν μ€μΌμΌλ§
scaler = StandardScaler()
df[numerical_features] = scaler.fit_transform(df[numerical_features])
# λ²μ£Όν μΈμ½λ©
for col in categorical_features:
le = LabelEncoder()
df[col] = le.fit_transform(df[col])
df.to_csv(output_data.path, index=False)
4. νμ΄νλΌμΈ κ³ κΈ κΈ°λ₯¶
4.1 μ‘°κ±΄λΆ μ€ν¶
"""
μ‘°κ±΄λΆ μ€νκ³Ό λΆκΈ°
"""
from kfp import dsl
@dsl.component
def check_data_quality(data_path: str) -> bool:
"""λ°μ΄ν° νμ§ κ²μ¬"""
import pandas as pd
df = pd.read_csv(data_path)
# νμ§ κ²μ¬ λ‘μ§
return df.isnull().sum().sum() == 0
@dsl.component
def clean_data(data_path: str, output_path: dsl.OutputPath(str)):
"""λ°μ΄ν° μ μ """
import pandas as pd
df = pd.read_csv(data_path)
df = df.dropna()
df.to_csv(output_path, index=False)
@dsl.component
def train_model(data_path: str):
"""λͺ¨λΈ νμ΅"""
pass
@dsl.pipeline(name="conditional-pipeline")
def conditional_pipeline(data_path: str):
# λ°μ΄ν° νμ§ κ²μ¬
quality_check = check_data_quality(data_path=data_path)
# μ‘°κ±΄λΆ μ€ν
with dsl.Condition(quality_check.output == False, name="need-cleaning"):
clean_task = clean_data(data_path=data_path)
train_model(data_path=clean_task.outputs["output_path"])
with dsl.Condition(quality_check.output == True, name="no-cleaning"):
train_model(data_path=data_path)
4.2 λ°λ³΅ μ€ν¶
"""
ParallelForλ₯Ό μ¬μ©ν λ°λ³΅ μ€ν
"""
from kfp import dsl
from typing import List
@dsl.component
def train_with_params(
data_path: str,
params: dict
) -> float:
"""νλΌλ―Έν°λ‘ λͺ¨λΈ νμ΅"""
# νμ΅ λ‘μ§
return 0.95
@dsl.component
def select_best_model(
accuracies: List[float],
param_sets: List[dict]
) -> dict:
"""μ΅κ³ μ±λ₯ λͺ¨λΈ μ ν"""
best_idx = accuracies.index(max(accuracies))
return param_sets[best_idx]
@dsl.pipeline(name="hyperparameter-search")
def hyperparameter_search_pipeline(data_path: str):
# νμ΄νΌνλΌλ―Έν° μ‘°ν©
param_sets = [
{"n_estimators": 50, "max_depth": 3},
{"n_estimators": 100, "max_depth": 5},
{"n_estimators": 200, "max_depth": 10}
]
# λ³λ ¬ νμ΅
with dsl.ParallelFor(param_sets) as params:
train_task = train_with_params(
data_path=data_path,
params=params
)
# κ²°κ³Ό μμ§ (ParallelFor λ°μμ)
# select_best_model(...)
4.3 리μμ€ μ€μ ¶
"""
μ»΄ν¬λνΈ λ¦¬μμ€ μ€μ
"""
from kfp import dsl
from kfp import kubernetes
@dsl.component
def gpu_training(data_path: str):
"""GPU νμ΅"""
pass
@dsl.pipeline(name="resource-pipeline")
def resource_pipeline(data_path: str):
# GPU νμ΅ νμ€ν¬
train_task = gpu_training(data_path=data_path)
# 리μμ€ μ€μ
train_task.set_cpu_limit("4")
train_task.set_memory_limit("16Gi")
train_task.set_cpu_request("2")
train_task.set_memory_request("8Gi")
# GPU μ€μ
kubernetes.add_node_selector(
train_task,
label_key="cloud.google.com/gke-accelerator",
label_value="nvidia-tesla-t4"
)
train_task.set_accelerator_type("nvidia.com/gpu")
train_task.set_accelerator_limit(1)
# νκ²½ λ³μ
train_task.set_env_variable("CUDA_VISIBLE_DEVICES", "0")
# λ³Όλ₯¨ λ§μ΄νΈ
kubernetes.mount_pvc(
train_task,
pvc_name="data-pvc",
mount_path="/data"
)
5. Kubernetes ν΅ν©¶
5.1 μν¬λ¦Ώ λ° ConfigMap¶
"""
Kubernetes 리μμ€ μ°λ
"""
from kfp import dsl
from kfp import kubernetes
@dsl.component
def component_with_secrets():
"""μν¬λ¦Ώμ μ¬μ©νλ μ»΄ν¬λνΈ"""
import os
api_key = os.environ.get("API_KEY")
# ...
@dsl.pipeline(name="k8s-resources-pipeline")
def k8s_pipeline():
task = component_with_secrets()
# μν¬λ¦Ώμμ νκ²½ λ³μ
kubernetes.use_secret_as_env(
task,
secret_name="api-credentials",
secret_key_to_env={"api-key": "API_KEY"}
)
# ConfigMapμμ νκ²½ λ³μ
kubernetes.use_config_map_as_env(
task,
config_map_name="app-config",
config_map_key_to_env={"setting": "APP_SETTING"}
)
# μν¬λ¦Ώμ νμΌλ‘ λ§μ΄νΈ
kubernetes.use_secret_as_volume(
task,
secret_name="tls-certs",
mount_path="/certs"
)
5.2 μλΉμ€ μ΄μΉ΄μ΄νΈ¶
"""
μλΉμ€ μ΄μΉ΄μ΄νΈ μ€μ
"""
from kfp import dsl
from kfp import kubernetes
@dsl.pipeline(name="sa-pipeline")
def service_account_pipeline():
task = some_component()
# νΉμ μλΉμ€ μ΄μΉ΄μ΄νΈ μ¬μ©
kubernetes.set_service_account(
task,
service_account="ml-pipeline-sa"
)
# μ΄λ―Έμ§ ν μν¬λ¦Ώ
kubernetes.add_image_pull_secret(
task,
secret_name="docker-registry-secret"
)
6. νμ΄νλΌμΈ μ€ν¶
6.1 SDKλ‘ μ€ν¶
"""
KFP SDKλ‘ νμ΄νλΌμΈ μ€ν
"""
from kfp import compiler
from kfp.client import Client
# νμ΄νλΌμΈ μ»΄νμΌ
compiler.Compiler().compile(
pipeline_func=ml_pipeline,
package_path="pipeline.yaml"
)
# KFP ν΄λΌμ΄μΈνΈ μμ±
client = Client(host="http://kubeflow-pipelines-api:8888")
# μ€ν μμ± λλ κ°μ Έμ€κΈ°
experiment = client.create_experiment(
name="my-experiment",
description="ML training experiments"
)
# νμ΄νλΌμΈ μ€ν
run = client.create_run_from_pipeline_package(
pipeline_file="pipeline.yaml",
experiment_id=experiment.experiment_id,
run_name="training-run-001",
arguments={
"input_data_path": "gs://bucket/data.csv",
"n_estimators": 200
}
)
print(f"Run ID: {run.run_id}")
print(f"Run URL: {client.get_run(run.run_id).display_url}")
# μ€ν μλ£ λκΈ°
client.wait_for_run_completion(run.run_id, timeout=3600)
# μ€ν μν νμΈ
run_details = client.get_run(run.run_id)
print(f"Status: {run_details.state}")
6.2 μ€μΌμ€λ§¶
"""
νμ΄νλΌμΈ μ€μΌμ€λ§
"""
from kfp.client import Client
client = Client(host="http://kubeflow-pipelines-api:8888")
# λ°λ³΅ μ€ν (cron) μμ±
recurring_run = client.create_recurring_run(
experiment_id=experiment.experiment_id,
job_name="daily-training",
pipeline_package_path="pipeline.yaml",
cron_expression="0 2 * * *", # λ§€μΌ μ€μ 2μ
max_concurrency=1,
arguments={
"input_data_path": "gs://bucket/daily_data/"
}
)
print(f"Recurring Run ID: {recurring_run.id}")
# λ°λ³΅ μ€ν λΉνμ±ν
client.disable_recurring_run(recurring_run.id)
# λ°λ³΅ μ€ν νμ±ν
client.enable_recurring_run(recurring_run.id)
6.3 μ€ν κ²°κ³Ό μ‘°ν¶
"""
μ€ν κ²°κ³Ό λ° μν°ν©νΈ μ‘°ν
"""
from kfp.client import Client
client = Client()
# νΉμ μ€ν κ²°κ³Ό
run = client.get_run("run-id")
print(f"State: {run.state}")
print(f"Created: {run.created_at}")
print(f"Finished: {run.finished_at}")
# μ€ν λͺ©λ‘ μ‘°ν
runs = client.list_runs(
experiment_id=experiment.experiment_id,
page_size=10,
sort_by="created_at desc"
)
for r in runs.runs:
print(f"{r.name}: {r.state}")
# νμ΄νλΌμΈ μΆλ ₯ μ‘°ν
# (μν°ν©νΈλ λ³΄ν΅ GCS, S3 λ±μ μ μ₯λ¨)
7. μ€μ νμ΄νλΌμΈ μμ ¶
"""
μμ ν ML νμ΄νλΌμΈ μμ
"""
from kfp import dsl, compiler
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(packages_to_install=["pandas", "scikit-learn"])
def ingest_data(
source_path: str,
output_data: Output[Dataset]
):
"""λ°μ΄ν° μμ§"""
import pandas as pd
df = pd.read_csv(source_path)
df.to_csv(output_data.path, index=False)
@dsl.component(packages_to_install=["pandas", "great-expectations"])
def validate_data(
input_data: Input[Dataset],
validation_report: Output[Dataset]
) -> bool:
"""λ°μ΄ν° κ²μ¦"""
import pandas as pd
df = pd.read_csv(input_data.path)
# κ²μ¦ λ‘μ§
is_valid = (
len(df) > 100 and
df.isnull().sum().sum() / df.size < 0.1
)
# 리ν¬νΈ μ μ₯
report = {"valid": is_valid, "rows": len(df)}
pd.DataFrame([report]).to_csv(validation_report.path, index=False)
return is_valid
@dsl.component(packages_to_install=["pandas", "scikit-learn", "joblib"])
def train_and_evaluate(
train_data: Input[Dataset],
model: Output[Model],
metrics: Output[Metrics],
n_estimators: int = 100
) -> float:
"""λͺ¨λΈ νμ΅ λ° νκ°"""
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import joblib
df = pd.read_csv(train_data.path)
X = df.drop("target", axis=1)
y = df["target"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
clf = RandomForestClassifier(n_estimators=n_estimators, random_state=42)
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred, average='macro')
joblib.dump(clf, model.path)
metrics.log_metric("accuracy", float(accuracy))
metrics.log_metric("f1_score", float(f1))
return accuracy
@dsl.component
def deploy_model(
model: Input[Model],
accuracy: float,
min_accuracy: float = 0.8
) -> str:
"""λͺ¨λΈ λ°°ν¬"""
if accuracy < min_accuracy:
return "Model not deployed: accuracy below threshold"
# λ°°ν¬ λ‘μ§ (μ: λͺ¨λΈμ μλΉ μΈνλΌμ λ°°ν¬)
return f"Model deployed with accuracy {accuracy}"
@dsl.pipeline(
name="End-to-End ML Pipeline",
description="Complete ML pipeline with data validation, training, and deployment"
)
def e2e_ml_pipeline(
data_source: str = "gs://bucket/data.csv",
n_estimators: int = 100,
min_accuracy: float = 0.8
):
# 1. λ°μ΄ν° μμ§
ingest_task = ingest_data(source_path=data_source)
# 2. λ°μ΄ν° κ²μ¦
validate_task = validate_data(
input_data=ingest_task.outputs["output_data"]
)
# 3. κ²μ¦ ν΅κ³Ό μ νμ΅
with dsl.Condition(validate_task.output == True, name="data-valid"):
train_task = train_and_evaluate(
train_data=ingest_task.outputs["output_data"],
n_estimators=n_estimators
)
# 4. λ°°ν¬
deploy_model(
model=train_task.outputs["model"],
accuracy=train_task.output,
min_accuracy=min_accuracy
)
# μ»΄νμΌ
compiler.Compiler().compile(
pipeline_func=e2e_ml_pipeline,
package_path="e2e_pipeline.yaml"
)
μ°μ΅ λ¬Έμ ¶
λ¬Έμ 1: κΈ°λ³Έ νμ΄νλΌμΈ¶
3λ¨κ³ νμ΄νλΌμΈμ μμ±νμΈμ: λ°μ΄ν° λ‘λ -> μ μ²λ¦¬ -> λͺ¨λΈ νμ΅
λ¬Έμ 2: νμ΄νΌνλΌλ―Έν° κ²μ¶
ParallelForλ₯Ό μ¬μ©νμ¬ μ¬λ¬ νμ΄νΌνλΌλ―Έν° μ‘°ν©μ λ³λ ¬λ‘ μ€ννλ νμ΄νλΌμΈμ μμ±νμΈμ.
λ¬Έμ 3: μ€μΌμ€λ§¶
λ§€μ£Ό μμμΌ μλ²½μ μ€νλλ μ¬νμ΅ νμ΄νλΌμΈμ μ€μ νμΈμ.
μμ½¶
| κ°λ | μ€λͺ |
|---|---|
| Pipeline | ML μν¬νλ‘μ° DAG |
| Component | νμ΄νλΌμΈμ κ°λ³ λ¨κ³ |
| @dsl.component | Python ν¨μ μ»΄ν¬λνΈ |
| @dsl.pipeline | νμ΄νλΌμΈ μ μ |
| dsl.Condition | μ‘°κ±΄λΆ μ€ν |
| dsl.ParallelFor | λ³λ ¬ λ°λ³΅ μ€ν |