MLflow Advanced

MLflow Advanced

1. MLflow Projects

MLflow Projects is a packaging format for reproducible ML code.

1.1 Project Structure

my_ml_project/
├── MLproject              # Project definition file
├── conda.yaml             # Conda environment definition
├── requirements.txt       # pip dependencies (optional)
├── train.py               # Training script
├── evaluate.py            # Evaluation script
└── data/
    └── sample_data.csv

1.2 MLproject File

# MLproject
name: churn-prediction

# Environment definition (3 options)
# Option 1: Conda
conda_env: conda.yaml

# Option 2: Docker
# docker_env:
#   image: my-docker-image:latest

# Option 3: System (use current environment)
# python_env: python_env.yaml

# Entry points
entry_points:
  main:
    parameters:
      data_path: {type: str, default: "data/train.csv"}
      n_estimators: {type: int, default: 100}
      max_depth: {type: int, default: 5}
      learning_rate: {type: float, default: 0.1}
    command: "python train.py --data-path {data_path} --n-estimators {n_estimators} --max-depth {max_depth} --learning-rate {learning_rate}"

  evaluate:
    parameters:
      model_path: {type: str}
      test_data: {type: str}
    command: "python evaluate.py --model-path {model_path} --test-data {test_data}"

  hyperparameter_search:
    parameters:
      n_trials: {type: int, default: 50}
    command: "python hyperparam_search.py --n-trials {n_trials}"

1.3 conda.yaml

# conda.yaml
name: churn-prediction-env
channels:
  - conda-forge
  - defaults
dependencies:
  - python=3.9
  - pip
  - scikit-learn=1.2.0
  - pandas=1.5.0
  - numpy=1.23.0
  - pip:
    - mlflow>=2.0
    - xgboost>=1.7

1.4 Running Projects

# Run locally
mlflow run . -P n_estimators=200 -P max_depth=10

# Run directly from Git
mlflow run https://github.com/user/ml-project.git -P data_path=s3://bucket/data.csv

# Specific branch/tag
mlflow run https://github.com/user/ml-project.git --version main

# Run in Docker environment
mlflow run . --env-manager docker

# Run specific entry point
mlflow run . -e evaluate -P model_path=models/model.pkl -P test_data=data/test.csv

# Specify experiment
mlflow run . --experiment-name "production-training"

1.5 train.py Example

"""
train.py - MLflow Project Training Script
"""

import argparse
import mlflow
import mlflow.sklearn
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

def main(args):
    # MLflow autologging
    mlflow.sklearn.autolog()

    with mlflow.start_run():
        # Load data
        df = pd.read_csv(args.data_path)
        X = df.drop("target", axis=1)
        y = df["target"]

        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )

        # Log additional parameters
        mlflow.log_param("data_path", args.data_path)
        mlflow.log_param("train_size", len(X_train))

        # Train model
        model = GradientBoostingClassifier(
            n_estimators=args.n_estimators,
            max_depth=args.max_depth,
            learning_rate=args.learning_rate,
            random_state=42
        )
        model.fit(X_train, y_train)

        # Evaluate
        y_pred = model.predict(X_test)
        metrics = {
            "accuracy": accuracy_score(y_test, y_pred),
            "precision": precision_score(y_test, y_pred, average='macro'),
            "recall": recall_score(y_test, y_pred, average='macro'),
            "f1": f1_score(y_test, y_pred, average='macro')
        }

        for name, value in metrics.items():
            mlflow.log_metric(name, value)

        print(f"Model trained with accuracy: {metrics['accuracy']:.4f}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--data-path", type=str, required=True)
    parser.add_argument("--n-estimators", type=int, default=100)
    parser.add_argument("--max-depth", type=int, default=5)
    parser.add_argument("--learning-rate", type=float, default=0.1)
    args = parser.parse_args()
    main(args)

2. MLflow Models

2.1 Model Flavors

"""
MLflow Model Flavors
"""

import mlflow

# Supported flavors
flavors = {
    "sklearn": "scikit-learn models",
    "pytorch": "PyTorch models",
    "tensorflow": "TensorFlow/Keras models",
    "xgboost": "XGBoost models",
    "lightgbm": "LightGBM models",
    "catboost": "CatBoost models",
    "transformers": "HuggingFace Transformers",
    "langchain": "LangChain models",
    "onnx": "ONNX models",
    "pyfunc": "Python functions (custom)"
}

2.2 Model Signatures

"""
Defining Model Signatures
"""

import mlflow
from mlflow.models.signature import ModelSignature, infer_signature
from mlflow.types.schema import Schema, ColSpec

# Method 1: Automatic inference
signature = infer_signature(X_train, model.predict(X_train))

# Method 2: Explicit definition
input_schema = Schema([
    ColSpec("double", "feature_1"),
    ColSpec("double", "feature_2"),
    ColSpec("string", "category")
])
output_schema = Schema([ColSpec("long", "prediction")])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)

# Include signature when saving model
mlflow.sklearn.log_model(
    model,
    "model",
    signature=signature,
    input_example=X_train[:5]  # Input example
)

2.3 Custom Models (pyfunc)

"""
Custom MLflow Model (pyfunc)
"""

import mlflow
import mlflow.pyfunc
import pandas as pd

class CustomModel(mlflow.pyfunc.PythonModel):
    """Custom MLflow Model"""

    def __init__(self, preprocessor, model, threshold=0.5):
        self.preprocessor = preprocessor
        self.model = model
        self.threshold = threshold

    def load_context(self, context):
        """Load artifacts"""
        import joblib
        # Can load additional files from context.artifacts
        pass

    def predict(self, context, model_input: pd.DataFrame) -> pd.DataFrame:
        """Perform prediction"""
        # Preprocessing
        processed = self.preprocessor.transform(model_input)

        # Prediction
        probabilities = self.model.predict_proba(processed)[:, 1]

        # Post-processing (apply threshold)
        predictions = (probabilities >= self.threshold).astype(int)

        return pd.DataFrame({
            "prediction": predictions,
            "probability": probabilities
        })


# Save custom model
custom_model = CustomModel(preprocessor, trained_model, threshold=0.6)

# Define Conda environment
conda_env = {
    "channels": ["conda-forge"],
    "dependencies": [
        "python=3.9",
        "pip",
        {"pip": ["mlflow", "scikit-learn", "pandas"]}
    ],
    "name": "custom_model_env"
}

with mlflow.start_run():
    mlflow.pyfunc.log_model(
        artifact_path="model",
        python_model=custom_model,
        conda_env=conda_env,
        artifacts={
            "preprocessor": "artifacts/preprocessor.pkl",
            "config": "artifacts/config.yaml"
        },
        signature=signature,
        input_example=sample_input
    )

2.4 Model Format Structure

model/
├── MLmodel                    # Model metadata
├── model.pkl                  # Serialized model
├── conda.yaml                 # Conda environment
├── python_env.yaml            # Python environment
├── requirements.txt           # pip dependencies
├── input_example.json         # Input example
└── registered_model_meta      # Registry metadata
# MLmodel file contents
artifact_path: model
flavors:
  python_function:
    env:
      conda: conda.yaml
      virtualenv: python_env.yaml
    loader_module: mlflow.sklearn
    model_path: model.pkl
    predict_fn: predict
    python_version: 3.9.0
  sklearn:
    code: null
    pickled_model: model.pkl
    serialization_format: cloudpickle
    sklearn_version: 1.2.0
mlflow_version: 2.8.0
model_uuid: a1b2c3d4-e5f6-7890-abcd-ef1234567890
signature:
  inputs: '[{"name": "feature_1", "type": "double"}, ...]'
  outputs: '[{"type": "long"}]'

3. Model Registry

3.1 Model Registration

"""
Using Model Registry
"""

import mlflow
from mlflow.tracking import MlflowClient

# Method 1: Register directly when logging model
with mlflow.start_run():
    # Training...
    mlflow.sklearn.log_model(
        model,
        "model",
        registered_model_name="ChurnPredictionModel"  # Auto-register
    )

# Method 2: Register from existing run
result = mlflow.register_model(
    model_uri="runs:/RUN_ID/model",
    name="ChurnPredictionModel"
)
print(f"Version: {result.version}")

# Method 3: Using MlflowClient
client = MlflowClient()
client.create_registered_model(
    name="ChurnPredictionModel",
    description="Customer churn prediction model",
    tags={"team": "ML", "project": "retention"}
)

# Add version
client.create_model_version(
    name="ChurnPredictionModel",
    source="runs:/RUN_ID/model",
    run_id="RUN_ID",
    description="Initial version with RF"
)

3.2 Model Stage Management

"""
Model Stage Transitions
"""

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Stages: None, Staging, Production, Archived

# Transition to Staging
client.transition_model_version_stage(
    name="ChurnPredictionModel",
    version=1,
    stage="Staging",
    archive_existing_versions=False
)

# Promote to Production
client.transition_model_version_stage(
    name="ChurnPredictionModel",
    version=1,
    stage="Production",
    archive_existing_versions=True  # Auto-archive existing Production versions
)

# Load model (by stage)
staging_model = mlflow.pyfunc.load_model("models:/ChurnPredictionModel/Staging")
prod_model = mlflow.pyfunc.load_model("models:/ChurnPredictionModel/Production")

# Load specific version
model_v1 = mlflow.pyfunc.load_model("models:/ChurnPredictionModel/1")

3.3 Model Metadata Management

"""
Model Version Metadata
"""

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Update model description
client.update_registered_model(
    name="ChurnPredictionModel",
    description="Updated description"
)

# Update version description
client.update_model_version(
    name="ChurnPredictionModel",
    version=1,
    description="Improved feature engineering"
)

# Add tags
client.set_registered_model_tag(
    name="ChurnPredictionModel",
    key="task",
    value="binary_classification"
)

client.set_model_version_tag(
    name="ChurnPredictionModel",
    version=1,
    key="validated",
    value="true"
)

# Get model information
model = client.get_registered_model("ChurnPredictionModel")
print(f"Name: {model.name}")
print(f"Description: {model.description}")
print(f"Latest versions: {model.latest_versions}")

# Get version information
version = client.get_model_version("ChurnPredictionModel", 1)
print(f"Version: {version.version}")
print(f"Stage: {version.current_stage}")
print(f"Source: {version.source}")
"""
Searching Registered Models
"""

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Get all models
models = client.search_registered_models()
for m in models:
    print(f"Model: {m.name}, Latest: {m.latest_versions}")

# Filtered search
models = client.search_registered_models(
    filter_string="name LIKE '%Churn%'"
)

# Search versions
versions = client.search_model_versions(
    filter_string="name='ChurnPredictionModel' and current_stage='Production'"
)
for v in versions:
    print(f"Version {v.version}: {v.current_stage}")

4. MLflow Serving

4.1 Local Serving

# Serve model (run ID based)
mlflow models serve -m "runs:/RUN_ID/model" -p 5001 --no-conda

# Serve model (Registry based)
mlflow models serve -m "models:/ChurnPredictionModel/Production" -p 5001

# Environment options
mlflow models serve -m "models:/MyModel/1" \
    --env-manager local \
    --host 0.0.0.0 \
    --port 5001

4.2 REST API Calls

"""
Calling MLflow Serving API
"""

import requests
import json

# Endpoint
url = "http://localhost:5001/invocations"

# Input data (multiple formats supported)
# Format 1: split orientation
data_split = {
    "dataframe_split": {
        "columns": ["feature_1", "feature_2", "feature_3"],
        "data": [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]
    }
}

# Format 2: records orientation
data_records = {
    "dataframe_records": [
        {"feature_1": 1.0, "feature_2": 2.0, "feature_3": 3.0},
        {"feature_1": 4.0, "feature_2": 5.0, "feature_3": 6.0}
    ]
}

# Format 3: instances (TensorFlow Serving compatible)
data_instances = {
    "instances": [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]
}

# API call
response = requests.post(
    url,
    headers={"Content-Type": "application/json"},
    data=json.dumps(data_split)
)

print(f"Status: {response.status_code}")
print(f"Predictions: {response.json()}")

4.3 Building Docker Images

# Create Docker image
mlflow models build-docker \
    -m "models:/ChurnPredictionModel/Production" \
    -n "churn-model:latest"

# Run image
docker run -p 5001:8080 churn-model:latest

# Generate Dockerfile directly
mlflow models generate-dockerfile \
    -m "models:/ChurnPredictionModel/Production" \
    -d ./docker-build

4.4 Batch Inference

"""
Performing Batch Inference
"""

import mlflow
import pandas as pd

# Load model
model = mlflow.pyfunc.load_model("models:/ChurnPredictionModel/Production")

# Load batch data
batch_data = pd.read_parquet("s3://bucket/batch_data.parquet")

# Batch prediction
predictions = model.predict(batch_data)

# Save results
results = batch_data.copy()
results["prediction"] = predictions
results.to_parquet("s3://bucket/predictions.parquet")

5. Advanced Configuration

5.1 Remote Tracking Server

# PostgreSQL backend + S3 artifact store
mlflow server \
    --backend-store-uri postgresql://user:password@host:5432/mlflow \
    --default-artifact-root s3://mlflow-artifacts/ \
    --host 0.0.0.0 \
    --port 5000

# Set environment variables
export MLFLOW_TRACKING_URI=http://mlflow-server:5000
export AWS_ACCESS_KEY_ID=your-key
export AWS_SECRET_ACCESS_KEY=your-secret

5.2 Authentication Setup

"""
MLflow Authentication Setup
"""

import os
import mlflow

# Basic authentication
os.environ["MLFLOW_TRACKING_USERNAME"] = "user"
os.environ["MLFLOW_TRACKING_PASSWORD"] = "password"

# Token-based authentication
os.environ["MLFLOW_TRACKING_TOKEN"] = "your-token"

# Azure ML integration
os.environ["AZURE_TENANT_ID"] = "tenant-id"
os.environ["AZURE_CLIENT_ID"] = "client-id"
os.environ["AZURE_CLIENT_SECRET"] = "client-secret"

5.3 Using Plugins

"""
MLflow Plugin Examples
"""

# Databricks plugin
# pip install databricks-cli

import mlflow
mlflow.set_tracking_uri("databricks")
mlflow.set_experiment("/Users/user@email.com/my-experiment")

# Google Cloud plugin
# pip install mlflow[google-cloud]
mlflow.set_tracking_uri("gs://bucket/mlflow")

6. Complete Workflow

"""
Complete MLflow Workflow Example
"""

import mlflow
from mlflow.tracking import MlflowClient

# 1. Setup
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("production-churn-model")
client = MlflowClient()

# 2. Training and experimentation
with mlflow.start_run(run_name="rf-optimized") as run:
    # Training code...
    mlflow.sklearn.log_model(model, "model", signature=signature)
    run_id = run.info.run_id

# 3. Register model
model_version = mlflow.register_model(
    f"runs:/{run_id}/model",
    "ChurnPredictionModel"
)

# 4. Transition to Staging
client.transition_model_version_stage(
    name="ChurnPredictionModel",
    version=model_version.version,
    stage="Staging"
)

# 5. Test (in Staging)
staging_model = mlflow.pyfunc.load_model("models:/ChurnPredictionModel/Staging")
test_results = evaluate_model(staging_model, test_data)

# 6. Promote to Production
if test_results["accuracy"] > 0.9:
    client.transition_model_version_stage(
        name="ChurnPredictionModel",
        version=model_version.version,
        stage="Production",
        archive_existing_versions=True
    )
    print(f"Model v{model_version.version} promoted to Production!")

Exercises

Exercise 1: Create MLflow Project

Create a complete MLflow Project and run it locally.

Exercise 2: Custom pyfunc Model

Write a custom pyfunc model that includes preprocessing and post-processing.

Exercise 3: Model Registry Workflow

Automate model registration and Staging -> Production transitions.


Summary

Feature Description
MLflow Projects Reproducible code packaging
MLflow Models Standardized model format
Model Registry Model version and stage management
MLflow Serving REST API serving
pyfunc Custom model wrapper

References

to navigate between lessons