Feature Store

Feature Store

1. Feature Store κ°œλ…

Feature StoreλŠ” ML ν”Όμ²˜λ₯Ό μ€‘μ•™μ—μ„œ 관리, μ €μž₯, μ„œλΉ™ν•˜λŠ” ν”Œλž«νΌμž…λ‹ˆλ‹€.

1.1 Feature Store의 μ—­ν• 

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                     Feature Store μ•„ν‚€ν…μ²˜                           β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                     β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚   β”‚                Feature Engineering                       β”‚      β”‚
β”‚   β”‚                                                          β”‚      β”‚
β”‚   β”‚   Raw Data β†’ Transform β†’ Features β†’ Feature Store        β”‚      β”‚
β”‚   β”‚                                                          β”‚      β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β”‚                          β”‚                                          β”‚
β”‚                          β–Ό                                          β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚   β”‚                  Feature Store                           β”‚      β”‚
β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”‚      β”‚
β”‚   β”‚  β”‚  Offline Store   β”‚  β”‚   Online Store   β”‚             β”‚      β”‚
β”‚   β”‚  β”‚                  β”‚  β”‚                  β”‚             β”‚      β”‚
β”‚   β”‚  β”‚ - ν•™μŠ΅μš©         β”‚  β”‚ - μΆ”λ‘ μš©         β”‚             β”‚      β”‚
β”‚   β”‚  β”‚ - 배치 처리      β”‚  β”‚ - μ €μ§€μ—°         β”‚             β”‚      β”‚
β”‚   β”‚  β”‚ - λŒ€μš©λŸ‰        β”‚  β”‚ - ν‚€-κ°’ 쑰회     β”‚             β”‚      β”‚
β”‚   β”‚  β”‚                  β”‚  β”‚                  β”‚             β”‚      β”‚
β”‚   β”‚  β”‚ (S3, BigQuery,  β”‚  β”‚ (Redis, DynamoDB)β”‚             β”‚      β”‚
β”‚   β”‚  β”‚  Parquet)       β”‚  β”‚                  β”‚             β”‚      β”‚
β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚      β”‚
β”‚   β”‚                                                          β”‚      β”‚
β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”‚      β”‚
β”‚   β”‚  β”‚           Feature Registry               β”‚           β”‚      β”‚
β”‚   β”‚  β”‚  - ν”Όμ²˜ 메타데이터                       β”‚           β”‚      β”‚
β”‚   β”‚  β”‚  - 버전 관리                             β”‚           β”‚      β”‚
β”‚   β”‚  β”‚  - μŠ€ν‚€λ§ˆ μ •μ˜                           β”‚           β”‚      β”‚
β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           β”‚      β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β”‚                          β”‚                                          β”‚
β”‚            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                           β”‚
β”‚            β–Ό                           β–Ό                           β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                   β”‚
β”‚   β”‚    Training     β”‚        β”‚    Inference    β”‚                   β”‚
β”‚   β”‚  (Offline)      β”‚        β”‚   (Online)      β”‚                   β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                   β”‚
β”‚                                                                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1.2 Feature Store의 μž₯점

"""
Feature Store λ„μž… 이점
"""

benefits = {
    "μž¬μ‚¬μš©μ„±": {
        "description": "ν•œ 번 μ •μ˜ν•œ ν”Όμ²˜λ₯Ό μ—¬λŸ¬ λͺ¨λΈμ—μ„œ μž¬μ‚¬μš©",
        "example": "user_total_purchasesλ₯Ό μ΄νƒˆ 예츑, μΆ”μ²œ, 사기 탐지 λͺ¨λΈμ—μ„œ λͺ¨λ‘ μ‚¬μš©"
    },
    "일관성": {
        "description": "ν•™μŠ΅κ³Ό μΆ”λ‘ μ—μ„œ λ™μΌν•œ ν”Όμ²˜ 계산 보μž₯",
        "example": "ν•™μŠ΅/μ„œλΉ™ 슀큐(skew) λ°©μ§€"
    },
    "μ‹œμ  μ •ν™•μ„±": {
        "description": "νŠΉμ • μ‹œμ μ˜ ν”Όμ²˜ 값을 μ •ν™•νžˆ 쑰회",
        "example": "예츑 μ‹œμ μ— μ•Œ 수 μžˆμ—ˆλ˜ μ •λ³΄λ§Œ μ‚¬μš© (데이터 λˆ„μˆ˜ λ°©μ§€)"
    },
    "ν”Όμ²˜ 검색": {
        "description": "쀑앙 λ ˆμ§€μŠ€νŠΈλ¦¬μ—μ„œ κΈ°μ‘΄ ν”Όμ²˜ 검색",
        "example": "νŒ€ κ°„ ν”Όμ²˜ 곡유 및 발견"
    },
    "κ±°λ²„λ„ŒμŠ€": {
        "description": "ν”Όμ²˜ 버전 관리, μ ‘κ·Ό μ œμ–΄, λ¦¬λ‹ˆμ§€ 좔적",
        "example": "규제 μ€€μˆ˜, 감사 λŒ€μ‘"
    }
}

2. Feast κ°œμš”

Feast(Feature Store)λŠ” κ°€μž₯ 널리 μ‚¬μš©λ˜λŠ” μ˜€ν”ˆμ†ŒμŠ€ Feature Storeμž…λ‹ˆλ‹€.

2.1 μ„€μΉ˜

# Feast μ„€μΉ˜
pip install feast

# νŠΉμ • μ €μž₯μ†Œ 지원 μΆ”κ°€
pip install feast[redis]    # Redis 온라인 μŠ€ν† μ–΄
pip install feast[gcp]      # GCP 지원
pip install feast[aws]      # AWS 지원

2.2 ν”„λ‘œμ νŠΈ ꡬ쑰

feature_repo/
β”œβ”€β”€ feature_store.yaml      # ν”„λ‘œμ νŠΈ μ„€μ •
β”œβ”€β”€ features/
β”‚   β”œβ”€β”€ user_features.py    # μ‚¬μš©μž ν”Όμ²˜ μ •μ˜
β”‚   └── product_features.py # μƒν’ˆ ν”Όμ²˜ μ •μ˜
β”œβ”€β”€ data/
β”‚   └── user_data.parquet   # μ˜€ν”„λΌμΈ 데이터
└── tests/
    └── test_features.py

2.3 κΈ°λ³Έ μ„€μ •

# feature_store.yaml
project: churn_prediction
registry: data/registry.db
provider: local

online_store:
  type: redis
  connection_string: "localhost:6379"

offline_store:
  type: file

entity_key_serialization_version: 2

3. ν”Όμ²˜ μ •μ˜

3.1 Entity와 Feature View

"""
features/user_features.py - μ‚¬μš©μž ν”Όμ²˜ μ •μ˜
"""

from datetime import timedelta
from feast import Entity, Feature, FeatureView, Field, FileSource, ValueType
from feast.types import Float32, Int64, String

# Entity μ •μ˜ (ν”Όμ²˜μ˜ ν‚€)
user = Entity(
    name="user_id",
    description="Customer ID",
    value_type=ValueType.INT64
)

# 데이터 μ†ŒμŠ€ μ •μ˜
user_source = FileSource(
    path="data/user_features.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp"
)

# Feature View μ •μ˜
user_features = FeatureView(
    name="user_features",
    entities=[user],
    ttl=timedelta(days=1),  # Time to Live
    schema=[
        Field(name="total_purchases", dtype=Int64),
        Field(name="avg_purchase_amount", dtype=Float32),
        Field(name="days_since_last_purchase", dtype=Int64),
        Field(name="customer_segment", dtype=String),
        Field(name="tenure_months", dtype=Int64),
    ],
    online=True,   # 온라인 μŠ€ν† μ–΄ ν™œμ„±ν™”
    source=user_source,
    tags={
        "team": "ml-platform",
        "owner": "data-science"
    }
)

# 집계 ν”Όμ²˜ (On-Demand Feature)
from feast import on_demand_feature_view
import pandas as pd

@on_demand_feature_view(
    sources=[user_features],
    schema=[
        Field(name="purchase_frequency", dtype=Float32),
        Field(name="is_high_value", dtype=Int64),
    ]
)
def user_derived_features(inputs: pd.DataFrame) -> pd.DataFrame:
    """μ‹€μ‹œκ°„ 계산 ν”Όμ²˜"""
    df = pd.DataFrame()
    df["purchase_frequency"] = inputs["total_purchases"] / inputs["tenure_months"]
    df["is_high_value"] = (inputs["avg_purchase_amount"] > 100).astype(int)
    return df

3.2 λ³΅μž‘ν•œ ν”Όμ²˜ μ •μ˜

"""
features/transaction_features.py - νŠΈλžœμž­μ…˜ ν”Όμ²˜
"""

from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64
from feast.aggregation import Aggregation
from datetime import timedelta

# νŠΈλžœμž­μ…˜ μ†ŒμŠ€
transaction_source = FileSource(
    path="data/transactions.parquet",
    timestamp_field="transaction_timestamp"
)

# νŠΈλžœμž­μ…˜ ν”Όμ²˜ λ·°
transaction_features = FeatureView(
    name="transaction_features",
    entities=[user],
    ttl=timedelta(days=7),
    schema=[
        Field(name="transaction_amount", dtype=Float32),
        Field(name="transaction_count_7d", dtype=Int64),
        Field(name="avg_transaction_7d", dtype=Float32),
    ],
    source=transaction_source,
    online=True
)

# μ‹œκ°„ μœˆλ„μš° 집계 (StreamFeatureView - Feast 0.26+)
from feast import StreamFeatureView, PushSource

push_source = PushSource(
    name="transaction_push_source",
    batch_source=transaction_source
)

streaming_features = StreamFeatureView(
    name="transaction_streaming_features",
    entities=[user],
    ttl=timedelta(hours=1),
    schema=[
        Field(name="transaction_count_1h", dtype=Int64),
        Field(name="total_amount_1h", dtype=Float32),
    ],
    source=push_source,
    aggregations=[
        Aggregation(
            column="transaction_amount",
            function="count",
            time_window=timedelta(hours=1)
        ),
        Aggregation(
            column="transaction_amount",
            function="sum",
            time_window=timedelta(hours=1)
        )
    ]
)

4. 온라인/μ˜€ν”„λΌμΈ μŠ€ν† μ–΄

4.1 Feast λ ˆμ§€μŠ€νŠΈλ¦¬ 적용

# ν”Όμ²˜ μ •μ˜λ₯Ό λ ˆμ§€μŠ€νŠΈλ¦¬μ— 적용
feast apply

# ν˜„μž¬ λ“±λ‘λœ ν”Όμ²˜ 확인
feast feature-views list
feast entities list

4.2 μ˜€ν”„λΌμΈ μŠ€ν† μ–΄ (ν•™μŠ΅μš©)

"""
μ˜€ν”„λΌμΈ ν”Όμ²˜ 쑰회 - ν•™μŠ΅ 데이터 생성
"""

from feast import FeatureStore
import pandas as pd

# Feature Store μ΄ˆκΈ°ν™”
store = FeatureStore(repo_path="./feature_repo")

# μ—”ν‹°ν‹° 데이터 (ν•™μŠ΅ν•  μƒ˜ν”Œ)
entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1003, 1004, 1005],
    "event_timestamp": pd.to_datetime([
        "2024-01-15 10:00:00",
        "2024-01-15 11:00:00",
        "2024-01-15 12:00:00",
        "2024-01-15 13:00:00",
        "2024-01-15 14:00:00"
    ])
})

# νžˆμŠ€ν† λ¦¬μ»¬ ν”Όμ²˜ 쑰회 (Point-in-time μ •ν™•)
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_features:total_purchases",
        "user_features:avg_purchase_amount",
        "user_features:days_since_last_purchase",
        "user_features:tenure_months",
        "user_derived_features:purchase_frequency",
        "transaction_features:transaction_count_7d"
    ]
).to_df()

print(training_df.head())
print(f"Training data shape: {training_df.shape}")

# κ²°κ³Όλ₯Ό ν•™μŠ΅μ— μ‚¬μš©
# X = training_df.drop(["user_id", "event_timestamp"], axis=1)
# y = training_df["label"]  # λ ˆμ΄λΈ”μ€ λ³„λ„λ‘œ 쑰인

4.3 온라인 μŠ€ν† μ–΄ (μΆ”λ‘ μš©)

"""
온라인 ν”Όμ²˜ 쑰회 - μ‹€μ‹œκ°„ μΆ”λ‘ 
"""

from feast import FeatureStore

store = FeatureStore(repo_path="./feature_repo")

# 온라인 μŠ€ν† μ–΄μ— ν”Όμ²˜ λ‘œλ“œ (materialization)
# μ˜€ν”„λΌμΈ β†’ 온라인 동기화
store.materialize_incremental(end_date=datetime.now())

# λ˜λŠ” 전체 κΈ°κ°„ μž¬λ™κΈ°ν™”
# store.materialize(
#     start_date=datetime(2024, 1, 1),
#     end_date=datetime.now()
# )

# 온라인 ν”Όμ²˜ 쑰회 (μ €μ§€μ—°)
feature_vector = store.get_online_features(
    features=[
        "user_features:total_purchases",
        "user_features:avg_purchase_amount",
        "user_features:days_since_last_purchase",
        "user_derived_features:purchase_frequency"
    ],
    entity_rows=[
        {"user_id": 1001},
        {"user_id": 1002}
    ]
).to_dict()

print(feature_vector)
# {
#     "user_id": [1001, 1002],
#     "total_purchases": [45, 23],
#     "avg_purchase_amount": [67.5, 89.2],
#     ...
# }

4.4 Feature Server

# Feature Server μ‹œμž‘ (REST API)
feast serve -p 6566

# API 호좜
curl -X POST "http://localhost:6566/get-online-features" \
  -H "Content-Type: application/json" \
  -d '{
    "features": [
      "user_features:total_purchases",
      "user_features:avg_purchase_amount"
    ],
    "entities": {
      "user_id": [1001, 1002]
    }
  }'

5. ν”Όμ²˜ μ„œλΉ™ 톡합

5.1 μΆ”λ‘  νŒŒμ΄ν”„λΌμΈ 톡합

"""
λͺ¨λΈ 좔둠에 Feature Store 톡합
"""

from feast import FeatureStore
import joblib
import numpy as np

class ModelWithFeatureStore:
    """Feature Store 톡합 λͺ¨λΈ μ„œλ²„"""

    def __init__(self, model_path: str, feature_repo_path: str):
        self.model = joblib.load(model_path)
        self.store = FeatureStore(repo_path=feature_repo_path)
        self.feature_list = [
            "user_features:total_purchases",
            "user_features:avg_purchase_amount",
            "user_features:days_since_last_purchase",
            "user_features:tenure_months",
            "user_derived_features:purchase_frequency"
        ]

    def predict(self, user_id: int) -> dict:
        """ν”Όμ²˜ 쑰회 ν›„ 예츑"""
        # 1. Feature Storeμ—μ„œ ν”Όμ²˜ 쑰회
        features = self.store.get_online_features(
            features=self.feature_list,
            entity_rows=[{"user_id": user_id}]
        ).to_dict()

        # 2. ν”Όμ²˜ 벑터 생성
        feature_names = [f.split(":")[1] for f in self.feature_list]
        feature_vector = np.array([
            [features[name][0] for name in feature_names]
        ])

        # 3. 예츑
        prediction = self.model.predict(feature_vector)[0]
        probability = self.model.predict_proba(feature_vector)[0]

        return {
            "user_id": user_id,
            "prediction": int(prediction),
            "probability": probability.tolist(),
            "features_used": dict(zip(feature_names, feature_vector[0].tolist()))
        }

# μ‚¬μš©
model_server = ModelWithFeatureStore(
    model_path="models/churn_model.pkl",
    feature_repo_path="./feature_repo"
)

result = model_server.predict(user_id=1001)
print(result)

5.2 FastAPI 톡합

"""
FastAPI + Feature Store
"""

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from feast import FeatureStore
import joblib
import numpy as np

app = FastAPI()
store = FeatureStore(repo_path="./feature_repo")
model = joblib.load("models/churn_model.pkl")

FEATURES = [
    "user_features:total_purchases",
    "user_features:avg_purchase_amount",
    "user_features:tenure_months"
]

class PredictionRequest(BaseModel):
    user_id: int

class PredictionResponse(BaseModel):
    user_id: int
    prediction: int
    probability: list
    features: dict

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    """ν”Όμ²˜ 쑰회 및 예츑"""
    try:
        # Feature Storeμ—μ„œ 쑰회
        features = store.get_online_features(
            features=FEATURES,
            entity_rows=[{"user_id": request.user_id}]
        ).to_dict()

        # ν”Όμ²˜ 벑터 생성
        feature_names = [f.split(":")[1] for f in FEATURES]
        feature_vector = np.array([
            [features[name][0] for name in feature_names]
        ])

        # 예츑
        pred = model.predict(feature_vector)[0]
        prob = model.predict_proba(feature_vector)[0]

        return PredictionResponse(
            user_id=request.user_id,
            prediction=int(pred),
            probability=prob.tolist(),
            features={name: features[name][0] for name in feature_names}
        )

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health():
    return {"status": "healthy"}

6. ν”Όμ²˜ μ—”μ§€λ‹ˆμ–΄λ§ νŒŒμ΄ν”„λΌμΈ

6.1 배치 ν”Όμ²˜ νŒŒμ΄ν”„λΌμΈ

"""
배치 ν”Όμ²˜ 계산 νŒŒμ΄ν”„λΌμΈ
"""

import pandas as pd
from datetime import datetime, timedelta

class FeatureEngineeringPipeline:
    """ν”Όμ²˜ μ—”μ§€λ‹ˆμ–΄λ§ νŒŒμ΄ν”„λΌμΈ"""

    def __init__(self, raw_data_path: str, output_path: str):
        self.raw_data_path = raw_data_path
        self.output_path = output_path

    def load_raw_data(self) -> pd.DataFrame:
        """μ›μ‹œ 데이터 λ‘œλ“œ"""
        return pd.read_parquet(self.raw_data_path)

    def compute_user_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """μ‚¬μš©μž ν”Όμ²˜ 계산"""
        user_features = df.groupby("user_id").agg({
            "transaction_amount": ["count", "sum", "mean"],
            "transaction_date": ["min", "max"]
        }).reset_index()

        user_features.columns = [
            "user_id", "total_purchases", "total_amount",
            "avg_purchase_amount", "first_purchase_date", "last_purchase_date"
        ]

        # μΆ”κ°€ ν”Όμ²˜ 계산
        today = datetime.now()
        user_features["days_since_last_purchase"] = (
            today - pd.to_datetime(user_features["last_purchase_date"])
        ).dt.days

        user_features["tenure_months"] = (
            pd.to_datetime(user_features["last_purchase_date"]) -
            pd.to_datetime(user_features["first_purchase_date"])
        ).dt.days // 30

        # νƒ€μž„μŠ€νƒ¬ν”„ μΆ”κ°€
        user_features["event_timestamp"] = today
        user_features["created_timestamp"] = today

        return user_features

    def validate_features(self, df: pd.DataFrame) -> bool:
        """ν”Όμ²˜ 검증"""
        # Null 체크
        if df.isnull().sum().sum() > 0:
            print("Warning: Null values detected")
            return False

        # λ²”μœ„ 체크
        if (df["avg_purchase_amount"] < 0).any():
            print("Warning: Negative values in avg_purchase_amount")
            return False

        return True

    def save_features(self, df: pd.DataFrame):
        """ν”Όμ²˜ μ €μž₯"""
        df.to_parquet(
            self.output_path,
            index=False,
            engine="pyarrow"
        )
        print(f"Features saved to {self.output_path}")

    def run(self):
        """νŒŒμ΄ν”„λΌμΈ μ‹€ν–‰"""
        print("Loading raw data...")
        raw_df = self.load_raw_data()

        print("Computing features...")
        features_df = self.compute_user_features(raw_df)

        print("Validating features...")
        if not self.validate_features(features_df):
            raise ValueError("Feature validation failed")

        print("Saving features...")
        self.save_features(features_df)

        return features_df

# μ‹€ν–‰
pipeline = FeatureEngineeringPipeline(
    raw_data_path="data/raw_transactions.parquet",
    output_path="feature_repo/data/user_features.parquet"
)
features = pipeline.run()

# Feast ν”Όμ²˜ 동기화
store = FeatureStore(repo_path="./feature_repo")
store.materialize_incremental(end_date=datetime.now())

6.2 슀트리밍 ν”Όμ²˜ μ—…λ°μ΄νŠΈ

"""
슀트리밍 ν”Όμ²˜ μ—…λ°μ΄νŠΈ (Feast Push)
"""

from feast import FeatureStore
import pandas as pd
from datetime import datetime

store = FeatureStore(repo_path="./feature_repo")

def process_streaming_event(event: dict):
    """μ‹€μ‹œκ°„ 이벀트 처리 및 ν”Όμ²˜ μ—…λ°μ΄νŠΈ"""
    # μ΄λ²€νŠΈμ—μ„œ ν”Όμ²˜ μΆ”μΆœ
    feature_df = pd.DataFrame([{
        "user_id": event["user_id"],
        "transaction_amount": event["amount"],
        "transaction_timestamp": datetime.now()
    }])

    # Feature Store에 ν‘Έμ‹œ
    store.push(
        push_source_name="transaction_push_source",
        df=feature_df
    )

# Kafka Consumer μ˜ˆμ‹œ
# for message in kafka_consumer:
#     event = json.loads(message.value)
#     process_streaming_event(event)

μ—°μŠ΅ 문제

문제 1: Feature Store μ„€μ •

Feast Feature Storeλ₯Ό μ„€μ •ν•˜κ³  κΈ°λ³Έ ν”Όμ²˜λ₯Ό μ •μ˜ν•˜μ„Έμš”.

문제 2: ν•™μŠ΅ 데이터 생성

get_historical_featuresλ₯Ό μ‚¬μš©ν•˜μ—¬ Point-in-time μ •ν™•ν•œ ν•™μŠ΅ 데이터λ₯Ό μƒμ„±ν•˜μ„Έμš”.

문제 3: 온라인 μ„œλΉ™

온라인 μŠ€ν† μ–΄λ₯Ό μ„€μ •ν•˜κ³  μ‹€μ‹œκ°„ 좔둠에 ν†΅ν•©ν•˜μ„Έμš”.


μš”μ•½

ꡬ성 μš”μ†Œ μ„€λͺ… μ‚¬μš© 사둀
Offline Store λŒ€μš©λŸ‰ νžˆμŠ€ν† λ¦¬μ»¬ 데이터 ν•™μŠ΅ 데이터 생성
Online Store μ €μ§€μ—° ν‚€-κ°’ 쑰회 μ‹€μ‹œκ°„ μΆ”λ‘ 
Feature Registry ν”Όμ²˜ 메타데이터 관리 ν”Όμ²˜ 발견, κ±°λ²„λ„ŒμŠ€
Materialization μ˜€ν”„λΌμΈ β†’ 온라인 동기화 ν”Όμ²˜ μ„œλΉ™ μ€€λΉ„

참고 자료

to navigate between lessons