Feature Store
Feature Store¶
1. Feature Store κ°λ ¶
Feature Storeλ ML νΌμ²λ₯Ό μ€μμμ κ΄λ¦¬, μ μ₯, μλΉνλ νλ«νΌμ λλ€.
1.1 Feature Storeμ μν ¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Feature Store μν€ν
μ² β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Feature Engineering β β
β β β β
β β Raw Data β Transform β Features β Feature Store β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Feature Store β β
β β ββββββββββββββββββββ ββββββββββββββββββββ β β
β β β Offline Store β β Online Store β β β
β β β β β β β β
β β β - νμ΅μ© β β - μΆλ‘ μ© β β β
β β β - λ°°μΉ μ²λ¦¬ β β - μ μ§μ° β β β
β β β - λμ©λ β β - ν€-κ° μ‘°ν β β β
β β β β β β β β
β β β (S3, BigQuery, β β (Redis, DynamoDB)β β β
β β β Parquet) β β β β β
β β ββββββββββββββββββββ ββββββββββββββββββββ β β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Feature Registry β β β
β β β - νΌμ² λ©νλ°μ΄ν° β β β
β β β - λ²μ κ΄λ¦¬ β β β
β β β - μ€ν€λ§ μ μ β β β
β β ββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββ΄ββββββββββββββ β
β βΌ βΌ β
β βββββββββββββββββββ βββββββββββββββββββ β
β β Training β β Inference β β
β β (Offline) β β (Online) β β
β βββββββββββββββββββ βββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1.2 Feature Storeμ μ₯μ ¶
"""
Feature Store λμ
μ΄μ
"""
benefits = {
"μ¬μ¬μ©μ±": {
"description": "ν λ² μ μν νΌμ²λ₯Ό μ¬λ¬ λͺ¨λΈμμ μ¬μ¬μ©",
"example": "user_total_purchasesλ₯Ό μ΄ν μμΈ‘, μΆμ², μ¬κΈ° νμ§ λͺ¨λΈμμ λͺ¨λ μ¬μ©"
},
"μΌκ΄μ±": {
"description": "νμ΅κ³Ό μΆλ‘ μμ λμΌν νΌμ² κ³μ° 보μ₯",
"example": "νμ΅/μλΉ μ€ν(skew) λ°©μ§"
},
"μμ μ νμ±": {
"description": "νΉμ μμ μ νΌμ² κ°μ μ νν μ‘°ν",
"example": "μμΈ‘ μμ μ μ μ μμλ μ λ³΄λ§ μ¬μ© (λ°μ΄ν° λμ λ°©μ§)"
},
"νΌμ² κ²μ": {
"description": "μ€μ λ μ§μ€νΈλ¦¬μμ κΈ°μ‘΄ νΌμ² κ²μ",
"example": "ν κ° νΌμ² 곡μ λ° λ°κ²¬"
},
"κ±°λ²λμ€": {
"description": "νΌμ² λ²μ κ΄λ¦¬, μ κ·Ό μ μ΄, 리λμ§ μΆμ ",
"example": "κ·μ μ€μ, κ°μ¬ λμ"
}
}
2. Feast κ°μ¶
Feast(Feature Store)λ κ°μ₯ λ리 μ¬μ©λλ μ€νμμ€ Feature Storeμ λλ€.
2.1 μ€μΉ¶
# Feast μ€μΉ
pip install feast
# νΉμ μ μ₯μ μ§μ μΆκ°
pip install feast[redis] # Redis μ¨λΌμΈ μ€ν μ΄
pip install feast[gcp] # GCP μ§μ
pip install feast[aws] # AWS μ§μ
2.2 νλ‘μ νΈ κ΅¬μ‘°¶
feature_repo/
βββ feature_store.yaml # νλ‘μ νΈ μ€μ
βββ features/
β βββ user_features.py # μ¬μ©μ νΌμ² μ μ
β βββ product_features.py # μν νΌμ² μ μ
βββ data/
β βββ user_data.parquet # μ€νλΌμΈ λ°μ΄ν°
βββ tests/
βββ test_features.py
2.3 κΈ°λ³Έ μ€μ ¶
# feature_store.yaml
project: churn_prediction
registry: data/registry.db
provider: local
online_store:
type: redis
connection_string: "localhost:6379"
offline_store:
type: file
entity_key_serialization_version: 2
3. νΌμ² μ μ¶
3.1 Entityμ Feature View¶
"""
features/user_features.py - μ¬μ©μ νΌμ² μ μ
"""
from datetime import timedelta
from feast import Entity, Feature, FeatureView, Field, FileSource, ValueType
from feast.types import Float32, Int64, String
# Entity μ μ (νΌμ²μ ν€)
user = Entity(
name="user_id",
description="Customer ID",
value_type=ValueType.INT64
)
# λ°μ΄ν° μμ€ μ μ
user_source = FileSource(
path="data/user_features.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp"
)
# Feature View μ μ
user_features = FeatureView(
name="user_features",
entities=[user],
ttl=timedelta(days=1), # Time to Live
schema=[
Field(name="total_purchases", dtype=Int64),
Field(name="avg_purchase_amount", dtype=Float32),
Field(name="days_since_last_purchase", dtype=Int64),
Field(name="customer_segment", dtype=String),
Field(name="tenure_months", dtype=Int64),
],
online=True, # μ¨λΌμΈ μ€ν μ΄ νμ±ν
source=user_source,
tags={
"team": "ml-platform",
"owner": "data-science"
}
)
# μ§κ³ νΌμ² (On-Demand Feature)
from feast import on_demand_feature_view
import pandas as pd
@on_demand_feature_view(
sources=[user_features],
schema=[
Field(name="purchase_frequency", dtype=Float32),
Field(name="is_high_value", dtype=Int64),
]
)
def user_derived_features(inputs: pd.DataFrame) -> pd.DataFrame:
"""μ€μκ° κ³μ° νΌμ²"""
df = pd.DataFrame()
df["purchase_frequency"] = inputs["total_purchases"] / inputs["tenure_months"]
df["is_high_value"] = (inputs["avg_purchase_amount"] > 100).astype(int)
return df
3.2 볡μ‘ν νΌμ² μ μ¶
"""
features/transaction_features.py - νΈλμμ
νΌμ²
"""
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64
from feast.aggregation import Aggregation
from datetime import timedelta
# νΈλμμ
μμ€
transaction_source = FileSource(
path="data/transactions.parquet",
timestamp_field="transaction_timestamp"
)
# νΈλμμ
νΌμ² λ·°
transaction_features = FeatureView(
name="transaction_features",
entities=[user],
ttl=timedelta(days=7),
schema=[
Field(name="transaction_amount", dtype=Float32),
Field(name="transaction_count_7d", dtype=Int64),
Field(name="avg_transaction_7d", dtype=Float32),
],
source=transaction_source,
online=True
)
# μκ° μλμ° μ§κ³ (StreamFeatureView - Feast 0.26+)
from feast import StreamFeatureView, PushSource
push_source = PushSource(
name="transaction_push_source",
batch_source=transaction_source
)
streaming_features = StreamFeatureView(
name="transaction_streaming_features",
entities=[user],
ttl=timedelta(hours=1),
schema=[
Field(name="transaction_count_1h", dtype=Int64),
Field(name="total_amount_1h", dtype=Float32),
],
source=push_source,
aggregations=[
Aggregation(
column="transaction_amount",
function="count",
time_window=timedelta(hours=1)
),
Aggregation(
column="transaction_amount",
function="sum",
time_window=timedelta(hours=1)
)
]
)
4. μ¨λΌμΈ/μ€νλΌμΈ μ€ν μ΄¶
4.1 Feast λ μ§μ€νΈλ¦¬ μ μ©¶
# νΌμ² μ μλ₯Ό λ μ§μ€νΈλ¦¬μ μ μ©
feast apply
# νμ¬ λ±λ‘λ νΌμ² νμΈ
feast feature-views list
feast entities list
4.2 μ€νλΌμΈ μ€ν μ΄ (νμ΅μ©)¶
"""
μ€νλΌμΈ νΌμ² μ‘°ν - νμ΅ λ°μ΄ν° μμ±
"""
from feast import FeatureStore
import pandas as pd
# Feature Store μ΄κΈ°ν
store = FeatureStore(repo_path="./feature_repo")
# μν°ν° λ°μ΄ν° (νμ΅ν μν)
entity_df = pd.DataFrame({
"user_id": [1001, 1002, 1003, 1004, 1005],
"event_timestamp": pd.to_datetime([
"2024-01-15 10:00:00",
"2024-01-15 11:00:00",
"2024-01-15 12:00:00",
"2024-01-15 13:00:00",
"2024-01-15 14:00:00"
])
})
# νμ€ν 리컬 νΌμ² μ‘°ν (Point-in-time μ ν)
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_features:total_purchases",
"user_features:avg_purchase_amount",
"user_features:days_since_last_purchase",
"user_features:tenure_months",
"user_derived_features:purchase_frequency",
"transaction_features:transaction_count_7d"
]
).to_df()
print(training_df.head())
print(f"Training data shape: {training_df.shape}")
# κ²°κ³Όλ₯Ό νμ΅μ μ¬μ©
# X = training_df.drop(["user_id", "event_timestamp"], axis=1)
# y = training_df["label"] # λ μ΄λΈμ λ³λλ‘ μ‘°μΈ
4.3 μ¨λΌμΈ μ€ν μ΄ (μΆλ‘ μ©)¶
"""
μ¨λΌμΈ νΌμ² μ‘°ν - μ€μκ° μΆλ‘
"""
from feast import FeatureStore
store = FeatureStore(repo_path="./feature_repo")
# μ¨λΌμΈ μ€ν μ΄μ νΌμ² λ‘λ (materialization)
# μ€νλΌμΈ β μ¨λΌμΈ λκΈ°ν
store.materialize_incremental(end_date=datetime.now())
# λλ μ 체 κΈ°κ° μ¬λκΈ°ν
# store.materialize(
# start_date=datetime(2024, 1, 1),
# end_date=datetime.now()
# )
# μ¨λΌμΈ νΌμ² μ‘°ν (μ μ§μ°)
feature_vector = store.get_online_features(
features=[
"user_features:total_purchases",
"user_features:avg_purchase_amount",
"user_features:days_since_last_purchase",
"user_derived_features:purchase_frequency"
],
entity_rows=[
{"user_id": 1001},
{"user_id": 1002}
]
).to_dict()
print(feature_vector)
# {
# "user_id": [1001, 1002],
# "total_purchases": [45, 23],
# "avg_purchase_amount": [67.5, 89.2],
# ...
# }
4.4 Feature Server¶
# Feature Server μμ (REST API)
feast serve -p 6566
# API νΈμΆ
curl -X POST "http://localhost:6566/get-online-features" \
-H "Content-Type: application/json" \
-d '{
"features": [
"user_features:total_purchases",
"user_features:avg_purchase_amount"
],
"entities": {
"user_id": [1001, 1002]
}
}'
5. νΌμ² μλΉ ν΅ν©¶
5.1 μΆλ‘ νμ΄νλΌμΈ ν΅ν©¶
"""
λͺ¨λΈ μΆλ‘ μ Feature Store ν΅ν©
"""
from feast import FeatureStore
import joblib
import numpy as np
class ModelWithFeatureStore:
"""Feature Store ν΅ν© λͺ¨λΈ μλ²"""
def __init__(self, model_path: str, feature_repo_path: str):
self.model = joblib.load(model_path)
self.store = FeatureStore(repo_path=feature_repo_path)
self.feature_list = [
"user_features:total_purchases",
"user_features:avg_purchase_amount",
"user_features:days_since_last_purchase",
"user_features:tenure_months",
"user_derived_features:purchase_frequency"
]
def predict(self, user_id: int) -> dict:
"""νΌμ² μ‘°ν ν μμΈ‘"""
# 1. Feature Storeμμ νΌμ² μ‘°ν
features = self.store.get_online_features(
features=self.feature_list,
entity_rows=[{"user_id": user_id}]
).to_dict()
# 2. νΌμ² λ²‘ν° μμ±
feature_names = [f.split(":")[1] for f in self.feature_list]
feature_vector = np.array([
[features[name][0] for name in feature_names]
])
# 3. μμΈ‘
prediction = self.model.predict(feature_vector)[0]
probability = self.model.predict_proba(feature_vector)[0]
return {
"user_id": user_id,
"prediction": int(prediction),
"probability": probability.tolist(),
"features_used": dict(zip(feature_names, feature_vector[0].tolist()))
}
# μ¬μ©
model_server = ModelWithFeatureStore(
model_path="models/churn_model.pkl",
feature_repo_path="./feature_repo"
)
result = model_server.predict(user_id=1001)
print(result)
5.2 FastAPI ν΅ν©¶
"""
FastAPI + Feature Store
"""
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from feast import FeatureStore
import joblib
import numpy as np
app = FastAPI()
store = FeatureStore(repo_path="./feature_repo")
model = joblib.load("models/churn_model.pkl")
FEATURES = [
"user_features:total_purchases",
"user_features:avg_purchase_amount",
"user_features:tenure_months"
]
class PredictionRequest(BaseModel):
user_id: int
class PredictionResponse(BaseModel):
user_id: int
prediction: int
probability: list
features: dict
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""νΌμ² μ‘°ν λ° μμΈ‘"""
try:
# Feature Storeμμ μ‘°ν
features = store.get_online_features(
features=FEATURES,
entity_rows=[{"user_id": request.user_id}]
).to_dict()
# νΌμ² λ²‘ν° μμ±
feature_names = [f.split(":")[1] for f in FEATURES]
feature_vector = np.array([
[features[name][0] for name in feature_names]
])
# μμΈ‘
pred = model.predict(feature_vector)[0]
prob = model.predict_proba(feature_vector)[0]
return PredictionResponse(
user_id=request.user_id,
prediction=int(pred),
probability=prob.tolist(),
features={name: features[name][0] for name in feature_names}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
return {"status": "healthy"}
6. νΌμ² μμ§λμ΄λ§ νμ΄νλΌμΈ¶
6.1 λ°°μΉ νΌμ² νμ΄νλΌμΈ¶
"""
λ°°μΉ νΌμ² κ³μ° νμ΄νλΌμΈ
"""
import pandas as pd
from datetime import datetime, timedelta
class FeatureEngineeringPipeline:
"""νΌμ² μμ§λμ΄λ§ νμ΄νλΌμΈ"""
def __init__(self, raw_data_path: str, output_path: str):
self.raw_data_path = raw_data_path
self.output_path = output_path
def load_raw_data(self) -> pd.DataFrame:
"""μμ λ°μ΄ν° λ‘λ"""
return pd.read_parquet(self.raw_data_path)
def compute_user_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""μ¬μ©μ νΌμ² κ³μ°"""
user_features = df.groupby("user_id").agg({
"transaction_amount": ["count", "sum", "mean"],
"transaction_date": ["min", "max"]
}).reset_index()
user_features.columns = [
"user_id", "total_purchases", "total_amount",
"avg_purchase_amount", "first_purchase_date", "last_purchase_date"
]
# μΆκ° νΌμ² κ³μ°
today = datetime.now()
user_features["days_since_last_purchase"] = (
today - pd.to_datetime(user_features["last_purchase_date"])
).dt.days
user_features["tenure_months"] = (
pd.to_datetime(user_features["last_purchase_date"]) -
pd.to_datetime(user_features["first_purchase_date"])
).dt.days // 30
# νμμ€ν¬ν μΆκ°
user_features["event_timestamp"] = today
user_features["created_timestamp"] = today
return user_features
def validate_features(self, df: pd.DataFrame) -> bool:
"""νΌμ² κ²μ¦"""
# Null 체ν¬
if df.isnull().sum().sum() > 0:
print("Warning: Null values detected")
return False
# λ²μ 체ν¬
if (df["avg_purchase_amount"] < 0).any():
print("Warning: Negative values in avg_purchase_amount")
return False
return True
def save_features(self, df: pd.DataFrame):
"""νΌμ² μ μ₯"""
df.to_parquet(
self.output_path,
index=False,
engine="pyarrow"
)
print(f"Features saved to {self.output_path}")
def run(self):
"""νμ΄νλΌμΈ μ€ν"""
print("Loading raw data...")
raw_df = self.load_raw_data()
print("Computing features...")
features_df = self.compute_user_features(raw_df)
print("Validating features...")
if not self.validate_features(features_df):
raise ValueError("Feature validation failed")
print("Saving features...")
self.save_features(features_df)
return features_df
# μ€ν
pipeline = FeatureEngineeringPipeline(
raw_data_path="data/raw_transactions.parquet",
output_path="feature_repo/data/user_features.parquet"
)
features = pipeline.run()
# Feast νΌμ² λκΈ°ν
store = FeatureStore(repo_path="./feature_repo")
store.materialize_incremental(end_date=datetime.now())
6.2 μ€νΈλ¦¬λ° νΌμ² μ λ°μ΄νΈ¶
"""
μ€νΈλ¦¬λ° νΌμ² μ
λ°μ΄νΈ (Feast Push)
"""
from feast import FeatureStore
import pandas as pd
from datetime import datetime
store = FeatureStore(repo_path="./feature_repo")
def process_streaming_event(event: dict):
"""μ€μκ° μ΄λ²€νΈ μ²λ¦¬ λ° νΌμ² μ
λ°μ΄νΈ"""
# μ΄λ²€νΈμμ νΌμ² μΆμΆ
feature_df = pd.DataFrame([{
"user_id": event["user_id"],
"transaction_amount": event["amount"],
"transaction_timestamp": datetime.now()
}])
# Feature Storeμ νΈμ
store.push(
push_source_name="transaction_push_source",
df=feature_df
)
# Kafka Consumer μμ
# for message in kafka_consumer:
# event = json.loads(message.value)
# process_streaming_event(event)
μ°μ΅ λ¬Έμ ¶
λ¬Έμ 1: Feature Store μ€μ ¶
Feast Feature Storeλ₯Ό μ€μ νκ³ κΈ°λ³Έ νΌμ²λ₯Ό μ μνμΈμ.
λ¬Έμ 2: νμ΅ λ°μ΄ν° μμ±¶
get_historical_featuresλ₯Ό μ¬μ©νμ¬ Point-in-time μ νν νμ΅ λ°μ΄ν°λ₯Ό μμ±νμΈμ.
λ¬Έμ 3: μ¨λΌμΈ μλΉ¶
μ¨λΌμΈ μ€ν μ΄λ₯Ό μ€μ νκ³ μ€μκ° μΆλ‘ μ ν΅ν©νμΈμ.
μμ½¶
| κ΅¬μ± μμ | μ€λͺ | μ¬μ© μ¬λ‘ |
|---|---|---|
| Offline Store | λμ©λ νμ€ν 리컬 λ°μ΄ν° | νμ΅ λ°μ΄ν° μμ± |
| Online Store | μ μ§μ° ν€-κ° μ‘°ν | μ€μκ° μΆλ‘ |
| Feature Registry | νΌμ² λ©νλ°μ΄ν° κ΄λ¦¬ | νΌμ² λ°κ²¬, κ±°λ²λμ€ |
| Materialization | μ€νλΌμΈ β μ¨λΌμΈ λκΈ°ν | νΌμ² μλΉ μ€λΉ |