MLflow ๊ณ ๊ธ
MLflow ๊ณ ๊ธ¶
1. MLflow Projects¶
MLflow Projects๋ ์ฌํ ๊ฐ๋ฅํ ML ์ฝ๋ ํจํค์ง ํ์์ ๋๋ค.
1.1 ํ๋ก์ ํธ ๊ตฌ์กฐ¶
my_ml_project/
โโโ MLproject # ํ๋ก์ ํธ ์ ์ ํ์ผ
โโโ conda.yaml # Conda ํ๊ฒฝ ์ ์
โโโ requirements.txt # pip ์์กด์ฑ (์ ํ)
โโโ train.py # ํ์ต ์คํฌ๋ฆฝํธ
โโโ evaluate.py # ํ๊ฐ ์คํฌ๋ฆฝํธ
โโโ data/
โโโ sample_data.csv
1.2 MLproject ํ์ผ¶
# MLproject
name: churn-prediction
# ํ๊ฒฝ ์ ์ (3๊ฐ์ง ์ต์
)
# ์ต์
1: Conda
conda_env: conda.yaml
# ์ต์
2: Docker
# docker_env:
# image: my-docker-image:latest
# ์ต์
3: System (ํ์ฌ ํ๊ฒฝ ์ฌ์ฉ)
# python_env: python_env.yaml
# ์ํธ๋ฆฌ ํฌ์ธํธ
entry_points:
main:
parameters:
data_path: {type: str, default: "data/train.csv"}
n_estimators: {type: int, default: 100}
max_depth: {type: int, default: 5}
learning_rate: {type: float, default: 0.1}
command: "python train.py --data-path {data_path} --n-estimators {n_estimators} --max-depth {max_depth} --learning-rate {learning_rate}"
evaluate:
parameters:
model_path: {type: str}
test_data: {type: str}
command: "python evaluate.py --model-path {model_path} --test-data {test_data}"
hyperparameter_search:
parameters:
n_trials: {type: int, default: 50}
command: "python hyperparam_search.py --n-trials {n_trials}"
1.3 conda.yaml¶
# conda.yaml
name: churn-prediction-env
channels:
- conda-forge
- defaults
dependencies:
- python=3.9
- pip
- scikit-learn=1.2.0
- pandas=1.5.0
- numpy=1.23.0
- pip:
- mlflow>=2.0
- xgboost>=1.7
1.4 ํ๋ก์ ํธ ์คํ¶
# ๋ก์ปฌ ์คํ
mlflow run . -P n_estimators=200 -P max_depth=10
# Git์์ ์ง์ ์คํ
mlflow run https://github.com/user/ml-project.git -P data_path=s3://bucket/data.csv
# ํน์ ๋ธ๋์น/ํ๊ทธ
mlflow run https://github.com/user/ml-project.git --version main
# Docker ํ๊ฒฝ์์ ์คํ
mlflow run . --env-manager docker
# ํน์ ์ํธ๋ฆฌ ํฌ์ธํธ ์คํ
mlflow run . -e evaluate -P model_path=models/model.pkl -P test_data=data/test.csv
# ์คํ ์ง์
mlflow run . --experiment-name "production-training"
1.5 train.py ์์¶
"""
train.py - MLflow Project ํ์ต ์คํฌ๋ฆฝํธ
"""
import argparse
import mlflow
import mlflow.sklearn
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
def main(args):
# MLflow ์๋ ๋ก๊น
mlflow.sklearn.autolog()
with mlflow.start_run():
# ๋ฐ์ดํฐ ๋ก๋
df = pd.read_csv(args.data_path)
X = df.drop("target", axis=1)
y = df["target"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# ์ถ๊ฐ ํ๋ผ๋ฏธํฐ ๋ก๊น
mlflow.log_param("data_path", args.data_path)
mlflow.log_param("train_size", len(X_train))
# ๋ชจ๋ธ ํ์ต
model = GradientBoostingClassifier(
n_estimators=args.n_estimators,
max_depth=args.max_depth,
learning_rate=args.learning_rate,
random_state=42
)
model.fit(X_train, y_train)
# ํ๊ฐ
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred, average='macro'),
"recall": recall_score(y_test, y_pred, average='macro'),
"f1": f1_score(y_test, y_pred, average='macro')
}
for name, value in metrics.items():
mlflow.log_metric(name, value)
print(f"Model trained with accuracy: {metrics['accuracy']:.4f}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data-path", type=str, required=True)
parser.add_argument("--n-estimators", type=int, default=100)
parser.add_argument("--max-depth", type=int, default=5)
parser.add_argument("--learning-rate", type=float, default=0.1)
args = parser.parse_args()
main(args)
2. MLflow Models¶
2.1 ๋ชจ๋ธ ํ๋ ์ด๋ฒ (Flavors)¶
"""
MLflow ๋ชจ๋ธ ํ๋ ์ด๋ฒ
"""
import mlflow
# ์ง์๋๋ ํ๋ ์ด๋ฒ
flavors = {
"sklearn": "scikit-learn ๋ชจ๋ธ",
"pytorch": "PyTorch ๋ชจ๋ธ",
"tensorflow": "TensorFlow/Keras ๋ชจ๋ธ",
"xgboost": "XGBoost ๋ชจ๋ธ",
"lightgbm": "LightGBM ๋ชจ๋ธ",
"catboost": "CatBoost ๋ชจ๋ธ",
"transformers": "HuggingFace Transformers",
"langchain": "LangChain ๋ชจ๋ธ",
"onnx": "ONNX ๋ชจ๋ธ",
"pyfunc": "Python ํจ์ (์ปค์คํ
)"
}
2.2 ๋ชจ๋ธ ์๊ทธ๋์ฒ¶
"""
๋ชจ๋ธ ์๊ทธ๋์ฒ ์ ์
"""
import mlflow
from mlflow.models.signature import ModelSignature, infer_signature
from mlflow.types.schema import Schema, ColSpec
# ๋ฐฉ๋ฒ 1: ์๋ ์ถ๋ก
signature = infer_signature(X_train, model.predict(X_train))
# ๋ฐฉ๋ฒ 2: ๋ช
์์ ์ ์
input_schema = Schema([
ColSpec("double", "feature_1"),
ColSpec("double", "feature_2"),
ColSpec("string", "category")
])
output_schema = Schema([ColSpec("long", "prediction")])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)
# ๋ชจ๋ธ ์ ์ฅ ์ ์๊ทธ๋์ฒ ํฌํจ
mlflow.sklearn.log_model(
model,
"model",
signature=signature,
input_example=X_train[:5] # ์
๋ ฅ ์์
)
2.3 ์ปค์คํ ๋ชจ๋ธ (pyfunc)¶
"""
์ปค์คํ
MLflow ๋ชจ๋ธ (pyfunc)
"""
import mlflow
import mlflow.pyfunc
import pandas as pd
class CustomModel(mlflow.pyfunc.PythonModel):
"""์ปค์คํ
MLflow ๋ชจ๋ธ"""
def __init__(self, preprocessor, model, threshold=0.5):
self.preprocessor = preprocessor
self.model = model
self.threshold = threshold
def load_context(self, context):
"""์ํฐํฉํธ ๋ก๋"""
import joblib
# context.artifacts์์ ์ถ๊ฐ ํ์ผ ๋ก๋ ๊ฐ๋ฅ
pass
def predict(self, context, model_input: pd.DataFrame) -> pd.DataFrame:
"""์์ธก ์ํ"""
# ์ ์ฒ๋ฆฌ
processed = self.preprocessor.transform(model_input)
# ์์ธก
probabilities = self.model.predict_proba(processed)[:, 1]
# ํ์ฒ๋ฆฌ (์๊ณ๊ฐ ์ ์ฉ)
predictions = (probabilities >= self.threshold).astype(int)
return pd.DataFrame({
"prediction": predictions,
"probability": probabilities
})
# ์ปค์คํ
๋ชจ๋ธ ์ ์ฅ
custom_model = CustomModel(preprocessor, trained_model, threshold=0.6)
# Conda ํ๊ฒฝ ์ ์
conda_env = {
"channels": ["conda-forge"],
"dependencies": [
"python=3.9",
"pip",
{"pip": ["mlflow", "scikit-learn", "pandas"]}
],
"name": "custom_model_env"
}
with mlflow.start_run():
mlflow.pyfunc.log_model(
artifact_path="model",
python_model=custom_model,
conda_env=conda_env,
artifacts={
"preprocessor": "artifacts/preprocessor.pkl",
"config": "artifacts/config.yaml"
},
signature=signature,
input_example=sample_input
)
2.4 ๋ชจ๋ธ ํฌ๋งท ์ ์ฅ¶
model/
โโโ MLmodel # ๋ชจ๋ธ ๋ฉํ๋ฐ์ดํฐ
โโโ model.pkl # ์ง๋ ฌํ๋ ๋ชจ๋ธ
โโโ conda.yaml # Conda ํ๊ฒฝ
โโโ python_env.yaml # Python ํ๊ฒฝ
โโโ requirements.txt # pip ์์กด์ฑ
โโโ input_example.json # ์
๋ ฅ ์์
โโโ registered_model_meta # ๋ ์ง์คํธ๋ฆฌ ๋ฉํ๋ฐ์ดํฐ
# MLmodel ํ์ผ ๋ด์ฉ
artifact_path: model
flavors:
python_function:
env:
conda: conda.yaml
virtualenv: python_env.yaml
loader_module: mlflow.sklearn
model_path: model.pkl
predict_fn: predict
python_version: 3.9.0
sklearn:
code: null
pickled_model: model.pkl
serialization_format: cloudpickle
sklearn_version: 1.2.0
mlflow_version: 2.8.0
model_uuid: a1b2c3d4-e5f6-7890-abcd-ef1234567890
signature:
inputs: '[{"name": "feature_1", "type": "double"}, ...]'
outputs: '[{"type": "long"}]'
3. Model Registry¶
3.1 ๋ชจ๋ธ ๋ฑ๋ก¶
"""
Model Registry ์ฌ์ฉ๋ฒ
"""
import mlflow
from mlflow.tracking import MlflowClient
# ๋ฐฉ๋ฒ 1: log_model ์ ์ง์ ๋ฑ๋ก
with mlflow.start_run():
# ํ์ต...
mlflow.sklearn.log_model(
model,
"model",
registered_model_name="ChurnPredictionModel" # ์๋ ๋ฑ๋ก
)
# ๋ฐฉ๋ฒ 2: ๊ธฐ์กด run์์ ๋ฑ๋ก
result = mlflow.register_model(
model_uri="runs:/RUN_ID/model",
name="ChurnPredictionModel"
)
print(f"Version: {result.version}")
# ๋ฐฉ๋ฒ 3: MlflowClient ์ฌ์ฉ
client = MlflowClient()
client.create_registered_model(
name="ChurnPredictionModel",
description="Customer churn prediction model",
tags={"team": "ML", "project": "retention"}
)
# ๋ฒ์ ์ถ๊ฐ
client.create_model_version(
name="ChurnPredictionModel",
source="runs:/RUN_ID/model",
run_id="RUN_ID",
description="Initial version with RF"
)
3.2 ๋ชจ๋ธ ์คํ ์ด์ง ๊ด๋ฆฌ¶
"""
๋ชจ๋ธ ์คํ
์ด์ง ์ ํ
"""
from mlflow.tracking import MlflowClient
client = MlflowClient()
# ์คํ
์ด์ง ์ข
๋ฅ: None, Staging, Production, Archived
# Staging์ผ๋ก ์ ํ
client.transition_model_version_stage(
name="ChurnPredictionModel",
version=1,
stage="Staging",
archive_existing_versions=False
)
# Production์ผ๋ก ์น๊ฒฉ
client.transition_model_version_stage(
name="ChurnPredictionModel",
version=1,
stage="Production",
archive_existing_versions=True # ๊ธฐ์กด Production ๋ฒ์ ์๋ ์์นด์ด๋ธ
)
# ๋ชจ๋ธ ๋ก๋ (์คํ
์ด์ง๋ณ)
staging_model = mlflow.pyfunc.load_model("models:/ChurnPredictionModel/Staging")
prod_model = mlflow.pyfunc.load_model("models:/ChurnPredictionModel/Production")
# ํน์ ๋ฒ์ ๋ก๋
model_v1 = mlflow.pyfunc.load_model("models:/ChurnPredictionModel/1")
3.3 ๋ชจ๋ธ ๋ฉํ๋ฐ์ดํฐ ๊ด๋ฆฌ¶
"""
๋ชจ๋ธ ๋ฒ์ ๋ฉํ๋ฐ์ดํฐ
"""
from mlflow.tracking import MlflowClient
client = MlflowClient()
# ๋ชจ๋ธ ์ค๋ช
์
๋ฐ์ดํธ
client.update_registered_model(
name="ChurnPredictionModel",
description="Updated description"
)
# ๋ฒ์ ์ค๋ช
์
๋ฐ์ดํธ
client.update_model_version(
name="ChurnPredictionModel",
version=1,
description="Improved feature engineering"
)
# ํ๊ทธ ์ถ๊ฐ
client.set_registered_model_tag(
name="ChurnPredictionModel",
key="task",
value="binary_classification"
)
client.set_model_version_tag(
name="ChurnPredictionModel",
version=1,
key="validated",
value="true"
)
# ๋ชจ๋ธ ์ ๋ณด ์กฐํ
model = client.get_registered_model("ChurnPredictionModel")
print(f"Name: {model.name}")
print(f"Description: {model.description}")
print(f"Latest versions: {model.latest_versions}")
# ๋ฒ์ ์ ๋ณด ์กฐํ
version = client.get_model_version("ChurnPredictionModel", 1)
print(f"Version: {version.version}")
print(f"Stage: {version.current_stage}")
print(f"Source: {version.source}")
3.4 ๋ชจ๋ธ ๊ฒ์¶
"""
๋ฑ๋ก๋ ๋ชจ๋ธ ๊ฒ์
"""
from mlflow.tracking import MlflowClient
client = MlflowClient()
# ๋ชจ๋ ๋ชจ๋ธ ์กฐํ
models = client.search_registered_models()
for m in models:
print(f"Model: {m.name}, Latest: {m.latest_versions}")
# ํํฐ๋ง ๊ฒ์
models = client.search_registered_models(
filter_string="name LIKE '%Churn%'"
)
# ๋ฒ์ ๊ฒ์
versions = client.search_model_versions(
filter_string="name='ChurnPredictionModel' and current_stage='Production'"
)
for v in versions:
print(f"Version {v.version}: {v.current_stage}")
4. MLflow Serving¶
4.1 ๋ก์ปฌ ์๋น¶
# ๋ชจ๋ธ ์๋น (run ID ๊ธฐ๋ฐ)
mlflow models serve -m "runs:/RUN_ID/model" -p 5001 --no-conda
# ๋ชจ๋ธ ์๋น (Registry ๊ธฐ๋ฐ)
mlflow models serve -m "models:/ChurnPredictionModel/Production" -p 5001
# ํ๊ฒฝ ์ต์
mlflow models serve -m "models:/MyModel/1" \
--env-manager local \
--host 0.0.0.0 \
--port 5001
4.2 REST API ํธ์ถ¶
"""
MLflow ์๋น API ํธ์ถ
"""
import requests
import json
# ์๋ํฌ์ธํธ
url = "http://localhost:5001/invocations"
# ์
๋ ฅ ๋ฐ์ดํฐ (์ฌ๋ฌ ํ์ ์ง์)
# ํ์ 1: split orientation
data_split = {
"dataframe_split": {
"columns": ["feature_1", "feature_2", "feature_3"],
"data": [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]
}
}
# ํ์ 2: records orientation
data_records = {
"dataframe_records": [
{"feature_1": 1.0, "feature_2": 2.0, "feature_3": 3.0},
{"feature_1": 4.0, "feature_2": 5.0, "feature_3": 6.0}
]
}
# ํ์ 3: instances (TensorFlow Serving ํธํ)
data_instances = {
"instances": [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]
}
# API ํธ์ถ
response = requests.post(
url,
headers={"Content-Type": "application/json"},
data=json.dumps(data_split)
)
print(f"Status: {response.status_code}")
print(f"Predictions: {response.json()}")
4.3 Docker ์ด๋ฏธ์ง ๋น๋¶
# Docker ์ด๋ฏธ์ง ์์ฑ
mlflow models build-docker \
-m "models:/ChurnPredictionModel/Production" \
-n "churn-model:latest"
# ์ด๋ฏธ์ง ์คํ
docker run -p 5001:8080 churn-model:latest
# Dockerfile ์ง์ ์์ฑ
mlflow models generate-dockerfile \
-m "models:/ChurnPredictionModel/Production" \
-d ./docker-build
4.4 ๋ฐฐ์น ์ถ๋ก ¶
"""
๋ฐฐ์น ์ถ๋ก ์ํ
"""
import mlflow
import pandas as pd
# ๋ชจ๋ธ ๋ก๋
model = mlflow.pyfunc.load_model("models:/ChurnPredictionModel/Production")
# ๋๋ ๋ฐ์ดํฐ ๋ก๋
batch_data = pd.read_parquet("s3://bucket/batch_data.parquet")
# ๋ฐฐ์น ์์ธก
predictions = model.predict(batch_data)
# ๊ฒฐ๊ณผ ์ ์ฅ
results = batch_data.copy()
results["prediction"] = predictions
results.to_parquet("s3://bucket/predictions.parquet")
5. ๊ณ ๊ธ ์ค์ ¶
5.1 ์๊ฒฉ Tracking Server¶
# PostgreSQL ๋ฐฑ์๋ + S3 ์ํฐํฉํธ ์ ์ฅ์
mlflow server \
--backend-store-uri postgresql://user:password@host:5432/mlflow \
--default-artifact-root s3://mlflow-artifacts/ \
--host 0.0.0.0 \
--port 5000
# ํ๊ฒฝ ๋ณ์ ์ค์
export MLFLOW_TRACKING_URI=http://mlflow-server:5000
export AWS_ACCESS_KEY_ID=your-key
export AWS_SECRET_ACCESS_KEY=your-secret
5.2 ์ธ์ฆ ์ค์ ¶
"""
MLflow ์ธ์ฆ ์ค์
"""
import os
import mlflow
# ๊ธฐ๋ณธ ์ธ์ฆ
os.environ["MLFLOW_TRACKING_USERNAME"] = "user"
os.environ["MLFLOW_TRACKING_PASSWORD"] = "password"
# ํ ํฐ ๊ธฐ๋ฐ ์ธ์ฆ
os.environ["MLFLOW_TRACKING_TOKEN"] = "your-token"
# Azure ML ํตํฉ
os.environ["AZURE_TENANT_ID"] = "tenant-id"
os.environ["AZURE_CLIENT_ID"] = "client-id"
os.environ["AZURE_CLIENT_SECRET"] = "client-secret"
5.3 ํ๋ฌ๊ทธ์ธ ์ฌ์ฉ¶
"""
MLflow ํ๋ฌ๊ทธ์ธ ์์
"""
# Databricks ํ๋ฌ๊ทธ์ธ
# pip install databricks-cli
import mlflow
mlflow.set_tracking_uri("databricks")
mlflow.set_experiment("/Users/user@email.com/my-experiment")
# Google Cloud ํ๋ฌ๊ทธ์ธ
# pip install mlflow[google-cloud]
mlflow.set_tracking_uri("gs://bucket/mlflow")
6. ์ค์ ์ํฌํ๋ก์ฐ¶
"""
์ ์ฒด MLflow ์ํฌํ๋ก์ฐ ์์
"""
import mlflow
from mlflow.tracking import MlflowClient
# 1. ์ค์
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("production-churn-model")
client = MlflowClient()
# 2. ํ์ต ๋ฐ ์คํ
with mlflow.start_run(run_name="rf-optimized") as run:
# ํ์ต ์ฝ๋...
mlflow.sklearn.log_model(model, "model", signature=signature)
run_id = run.info.run_id
# 3. ๋ชจ๋ธ ๋ฑ๋ก
model_version = mlflow.register_model(
f"runs:/{run_id}/model",
"ChurnPredictionModel"
)
# 4. Staging ์ ํ
client.transition_model_version_stage(
name="ChurnPredictionModel",
version=model_version.version,
stage="Staging"
)
# 5. ํ
์คํธ (Staging์์)
staging_model = mlflow.pyfunc.load_model("models:/ChurnPredictionModel/Staging")
test_results = evaluate_model(staging_model, test_data)
# 6. Production ์น๊ฒฉ
if test_results["accuracy"] > 0.9:
client.transition_model_version_stage(
name="ChurnPredictionModel",
version=model_version.version,
stage="Production",
archive_existing_versions=True
)
print(f"Model v{model_version.version} promoted to Production!")
์ฐ์ต ๋ฌธ์ ¶
๋ฌธ์ 1: MLflow Project ์์ฑ¶
์์ ํ MLflow Project๋ฅผ ์์ฑํ๊ณ ๋ก์ปฌ์์ ์คํํ์ธ์.
๋ฌธ์ 2: ์ปค์คํ pyfunc ๋ชจ๋ธ¶
์ ์ฒ๋ฆฌ์ ํ์ฒ๋ฆฌ๋ฅผ ํฌํจํ๋ ์ปค์คํ pyfunc ๋ชจ๋ธ์ ์์ฑํ์ธ์.
๋ฌธ์ 3: Model Registry ์ํฌํ๋ก์ฐ¶
๋ชจ๋ธ์ ๋ฑ๋กํ๊ณ Staging -> Production ์ ํ์ ์๋ํํ์ธ์.
์์ฝ¶
| ๊ธฐ๋ฅ | ์ค๋ช |
|---|---|
| MLflow Projects | ์ฌํ ๊ฐ๋ฅํ ์ฝ๋ ํจํค์ง |
| MLflow Models | ํ์คํ๋ ๋ชจ๋ธ ํฌ๋งท |
| Model Registry | ๋ชจ๋ธ ๋ฒ์ ๋ฐ ์คํ ์ด์ง ๊ด๋ฆฌ |
| MLflow Serving | REST API ์๋น |
| pyfunc | ์ปค์คํ ๋ชจ๋ธ ๋ํผ |