19. 베이지안 추론 (Bayesian Inference)
19. 베이지안 추론 (Bayesian Inference)¶
이전: 베이지안 통계 기초 | 다음: 시계열 분석 기초
개요¶
켤레 사전분포가 적용되지 않는 복잡한 모델에서는 사후분포를 해석적으로 구할 수 없습니다. 이 장에서는 MCMC(Markov Chain Monte Carlo) 방법과 PyMC 라이브러리를 사용한 베이지안 추론을 학습합니다.
1. MCMC 소개¶
1.1 왜 MCMC가 필요한가?¶
문제: 복잡한 사후분포의 정규화 상수를 계산하기 어려움
$$P(\theta|D) = \frac{P(D|\theta)P(\theta)}{\int P(D|\theta)P(\theta)d\theta}$$
분모의 적분이 고차원에서 계산 불가능할 수 있음.
해결: 사후분포에서 직접 샘플링하여 분포를 근사
import numpy as np
import scipy.stats as stats
import matplotlib.pyplot as plt
# MCMC의 핵심 아이디어 시각화
np.random.seed(42)
# 목표 분포: 혼합 정규분포 (정규화 상수 없이도 비율만 알면 됨)
def target_unnormalized(x):
"""비정규화 목표 분포"""
return 0.3 * np.exp(-0.5 * ((x - 2) / 0.5)**2) + \
0.7 * np.exp(-0.5 * ((x + 1) / 1)**2)
x_range = np.linspace(-5, 5, 200)
y = target_unnormalized(x_range)
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 비정규화 목표 분포
axes[0].plot(x_range, y, 'b-', lw=2, label='Unnormalized target')
axes[0].fill_between(x_range, y, alpha=0.3)
axes[0].set_xlabel('x')
axes[0].set_ylabel('Unnormalized density')
axes[0].set_title('비정규화 목표 분포')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# MCMC 샘플의 히스토그램 (나중에 채울 예정)
axes[1].text(0.5, 0.5, 'MCMC 샘플\n(다음 섹션에서 채움)',
transform=axes[1].transAxes, ha='center', va='center', fontsize=14)
axes[1].set_xlabel('x')
axes[1].set_ylabel('밀도')
axes[1].set_title('MCMC 샘플 히스토그램')
plt.tight_layout()
plt.show()
1.2 마르코프 체인 기초¶
정의: 다음 상태가 현재 상태에만 의존하는 확률적 과정
$$P(X_{t+1}|X_1, X_2, ..., X_t) = P(X_{t+1}|X_t)$$
def simple_markov_chain_demo():
"""간단한 마르코프 체인 시뮬레이션"""
# 날씨 전이 확률: 맑음(0), 흐림(1), 비(2)
transition_matrix = np.array([
[0.7, 0.2, 0.1], # 맑음 → 맑음/흐림/비
[0.3, 0.4, 0.3], # 흐림 → 맑음/흐림/비
[0.2, 0.3, 0.5], # 비 → 맑음/흐림/비
])
states = ['맑음', '흐림', '비']
# 시뮬레이션
n_steps = 1000
current_state = 0 # 맑음에서 시작
chain = [current_state]
for _ in range(n_steps):
probs = transition_matrix[current_state]
current_state = np.random.choice([0, 1, 2], p=probs)
chain.append(current_state)
# 정상 분포 추정
unique, counts = np.unique(chain, return_counts=True)
empirical_dist = counts / len(chain)
# 이론적 정상 분포 (전이행렬의 왼쪽 고유벡터)
eigenvalues, eigenvectors = np.linalg.eig(transition_matrix.T)
stationary_idx = np.argmin(np.abs(eigenvalues - 1))
stationary_dist = eigenvectors[:, stationary_idx].real
stationary_dist = stationary_dist / stationary_dist.sum()
print("=== 마르코프 체인 날씨 시뮬레이션 ===")
print(f"시뮬레이션 단계: {n_steps}")
print("\n상태별 빈도:")
for i, state in enumerate(states):
print(f" {state}: 경험적={empirical_dist[i]:.3f}, 이론적={stationary_dist[i]:.3f}")
return chain
chain = simple_markov_chain_demo()
2. Metropolis-Hastings 알고리즘¶
2.1 알고리즘 개요¶
목표: 목표 분포 π(x)에서 샘플 추출
알고리즘: 1. 초기값 x₀ 선택 2. 제안 분포 q(x'|x)에서 후보 x' 생성 3. 수락 확률 계산: α = min(1, [π(x')q(x|x')] / [π(x)q(x'|x)]) 4. U ~ Uniform(0,1)이면: - U < α: x' 수락 (x_{t+1} = x') - 그렇지 않으면: x 유지 (x_{t+1} = x) 5. 2-4 반복
2.2 구현¶
def metropolis_hastings(target, proposal_std, n_samples, initial_value=0, burn_in=1000):
"""
Metropolis-Hastings 알고리즘
Parameters:
-----------
target : callable
비정규화 목표 밀도 함수
proposal_std : float
제안 분포(정규분포)의 표준편차
n_samples : int
생성할 샘플 수
initial_value : float
초기값
burn_in : int
burn-in 기간 (버릴 초기 샘플 수)
Returns:
--------
samples : ndarray
MCMC 샘플
acceptance_rate : float
수락률
"""
samples = np.zeros(n_samples + burn_in)
samples[0] = initial_value
n_accepted = 0
for i in range(1, n_samples + burn_in):
current = samples[i-1]
# 제안 (대칭 정규분포)
proposal = np.random.normal(current, proposal_std)
# 수락 확률 (대칭 제안분포이므로 q 취소됨)
acceptance_ratio = target(proposal) / target(current)
acceptance_prob = min(1, acceptance_ratio)
# 수락/거부
if np.random.uniform() < acceptance_prob:
samples[i] = proposal
n_accepted += 1
else:
samples[i] = current
# burn-in 제거
samples = samples[burn_in:]
acceptance_rate = n_accepted / (n_samples + burn_in)
return samples, acceptance_rate
# 예제: 혼합 정규분포에서 샘플링
def mixture_target(x):
"""혼합 정규분포 (비정규화)"""
return 0.3 * np.exp(-0.5 * ((x - 2) / 0.5)**2) + \
0.7 * np.exp(-0.5 * ((x + 1) / 1)**2)
# 다양한 제안 분포로 실험
proposal_stds = [0.1, 1.0, 5.0]
fig, axes = plt.subplots(2, 3, figsize=(15, 8))
np.random.seed(42)
for i, proposal_std in enumerate(proposal_stds):
samples, acc_rate = metropolis_hastings(
mixture_target,
proposal_std=proposal_std,
n_samples=10000,
burn_in=1000
)
# 트레이스 플롯
axes[0, i].plot(samples[:500], 'b-', alpha=0.7, lw=0.5)
axes[0, i].set_xlabel('Iteration')
axes[0, i].set_ylabel('Value')
axes[0, i].set_title(f'Trace (σ={proposal_std}, acc={acc_rate:.2%})')
axes[0, i].grid(True, alpha=0.3)
# 히스토그램
x_range = np.linspace(-5, 5, 200)
true_density = mixture_target(x_range)
true_density = true_density / np.trapz(true_density, x_range)
axes[1, i].hist(samples, bins=50, density=True, alpha=0.7, label='MCMC samples')
axes[1, i].plot(x_range, true_density, 'r-', lw=2, label='True density')
axes[1, i].set_xlabel('Value')
axes[1, i].set_ylabel('Density')
axes[1, i].set_title(f'Histogram (σ={proposal_std})')
axes[1, i].legend()
axes[1, i].grid(True, alpha=0.3)
plt.suptitle('Metropolis-Hastings: 제안 분포 표준편차의 영향')
plt.tight_layout()
plt.show()
2.3 다변량 확장¶
def metropolis_hastings_2d(target, proposal_cov, n_samples, initial_value=None, burn_in=1000):
"""
2차원 Metropolis-Hastings
Parameters:
-----------
target : callable
비정규화 2D 목표 밀도 함수
proposal_cov : ndarray
제안 분포의 공분산 행렬 (2x2)
n_samples : int
생성할 샘플 수
"""
if initial_value is None:
initial_value = np.array([0.0, 0.0])
samples = np.zeros((n_samples + burn_in, 2))
samples[0] = initial_value
n_accepted = 0
for i in range(1, n_samples + burn_in):
current = samples[i-1]
# 다변량 정규분포에서 제안
proposal = np.random.multivariate_normal(current, proposal_cov)
# 수락 확률
acceptance_ratio = target(proposal) / (target(current) + 1e-10)
acceptance_prob = min(1, acceptance_ratio)
if np.random.uniform() < acceptance_prob:
samples[i] = proposal
n_accepted += 1
else:
samples[i] = current
samples = samples[burn_in:]
acceptance_rate = n_accepted / (n_samples + burn_in)
return samples, acceptance_rate
# 2D 혼합 정규분포
def target_2d(x):
"""2D 혼합 정규분포"""
# 첫 번째 성분
mu1 = np.array([2, 2])
cov1 = np.array([[0.5, 0.3], [0.3, 0.5]])
term1 = stats.multivariate_normal(mu1, cov1).pdf(x)
# 두 번째 성분
mu2 = np.array([-1, -1])
cov2 = np.array([[0.8, -0.2], [-0.2, 0.8]])
term2 = stats.multivariate_normal(mu2, cov2).pdf(x)
return 0.4 * term1 + 0.6 * term2
# 샘플링
np.random.seed(42)
proposal_cov = np.array([[0.5, 0], [0, 0.5]])
samples_2d, acc_rate = metropolis_hastings_2d(
target_2d,
proposal_cov=proposal_cov,
n_samples=20000,
burn_in=5000
)
print(f"수락률: {acc_rate:.2%}")
# 시각화
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# 트레이스 플롯
axes[0].plot(samples_2d[:1000, 0], 'b-', alpha=0.7, lw=0.5, label='x1')
axes[0].plot(samples_2d[:1000, 1], 'r-', alpha=0.7, lw=0.5, label='x2')
axes[0].set_xlabel('Iteration')
axes[0].set_ylabel('Value')
axes[0].set_title('Trace Plot (first 1000)')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 2D 산점도
axes[1].scatter(samples_2d[::10, 0], samples_2d[::10, 1], alpha=0.3, s=5)
axes[1].set_xlabel('x1')
axes[1].set_ylabel('x2')
axes[1].set_title('MCMC Samples (2D)')
axes[1].grid(True, alpha=0.3)
# 등고선과 비교
x1_range = np.linspace(-4, 5, 100)
x2_range = np.linspace(-4, 5, 100)
X1, X2 = np.meshgrid(x1_range, x2_range)
positions = np.dstack((X1, X2))
Z = np.array([[target_2d(p) for p in row] for row in positions])
axes[2].contour(X1, X2, Z, levels=10, cmap='viridis')
axes[2].scatter(samples_2d[::20, 0], samples_2d[::20, 1], alpha=0.2, s=3, c='red')
axes[2].set_xlabel('x1')
axes[2].set_ylabel('x2')
axes[2].set_title('Samples vs True Density')
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
3. PyMC 소개¶
3.1 설치 및 기본 설정¶
# PyMC 설치 (Python 3.9+)
pip install pymc arviz
# 또는 conda
conda install -c conda-forge pymc arviz
# 기본 import
import pymc as pm
import arviz as az
# 시각화 설정
az.style.use("arviz-darkgrid")
print(f"PyMC version: {pm.__version__}")
print(f"ArviZ version: {az.__version__}")
3.2 첫 번째 PyMC 모델: 동전 던지기¶
import pymc as pm
import arviz as az
# 데이터
n_trials = 100
n_heads = 65
# PyMC 모델 정의
with pm.Model() as coin_model:
# 사전분포: Beta(1, 1) = Uniform(0, 1)
p = pm.Beta('p', alpha=1, beta=1)
# 가능도: Binomial
y = pm.Binomial('y', n=n_trials, p=p, observed=n_heads)
# 사후분포 샘플링
trace = pm.sample(2000, tune=1000, cores=1, random_seed=42)
# 결과 확인
print(az.summary(trace, var_names=['p']))
# 시각화
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
# 트레이스 플롯
az.plot_trace(trace, var_names=['p'], axes=axes.reshape(1, 2))
plt.tight_layout()
plt.show()
# 사후분포 플롯
fig, ax = plt.subplots(figsize=(10, 5))
az.plot_posterior(trace, var_names=['p'], ax=ax)
ax.axvline(0.65, color='r', linestyle='--', label=f'MLE = 0.65')
ax.legend()
plt.show()
3.3 정규분포 모수 추정¶
# 데이터 생성
np.random.seed(42)
true_mu = 5.0
true_sigma = 2.0
data = np.random.normal(true_mu, true_sigma, 50)
print(f"True μ: {true_mu}, σ: {true_sigma}")
print(f"Sample mean: {data.mean():.3f}, std: {data.std():.3f}")
# PyMC 모델
with pm.Model() as normal_model:
# 사전분포
mu = pm.Normal('mu', mu=0, sigma=10) # 약한 정보 사전분포
sigma = pm.HalfNormal('sigma', sigma=5) # 양수 제약
# 가능도
y = pm.Normal('y', mu=mu, sigma=sigma, observed=data)
# 샘플링
trace = pm.sample(2000, tune=1000, cores=1, random_seed=42)
# 결과 요약
print("\n=== 사후분포 요약 ===")
print(az.summary(trace, var_names=['mu', 'sigma']))
# 시각화
fig = plt.figure(figsize=(14, 8))
# 트레이스 플롯
axes = fig.subplots(2, 2)
az.plot_trace(trace, var_names=['mu', 'sigma'], axes=axes)
plt.tight_layout()
plt.show()
# 결합 사후분포
fig, ax = plt.subplots(figsize=(8, 8))
az.plot_pair(trace, var_names=['mu', 'sigma'], kind='kde', ax=ax)
ax.axvline(true_mu, color='r', linestyle='--', alpha=0.5)
ax.axhline(true_sigma, color='r', linestyle='--', alpha=0.5)
ax.scatter([true_mu], [true_sigma], color='r', s=100, marker='x', label='True values')
ax.legend()
plt.show()
4. 베이지안 선형 회귀¶
4.1 모델 정의¶
$$y_i = \beta_0 + \beta_1 x_i + \epsilon_i$$
$$\epsilon_i \sim N(0, \sigma^2)$$
베이지안 설정: - β₀ ~ N(0, 10²) - β₁ ~ N(0, 10²) - σ ~ HalfNormal(5)
4.2 PyMC 구현¶
# 데이터 생성
np.random.seed(42)
n = 100
x = np.random.uniform(0, 10, n)
true_beta0 = 2.5
true_beta1 = 1.5
true_sigma = 1.0
y = true_beta0 + true_beta1 * x + np.random.normal(0, true_sigma, n)
print(f"True parameters: β0={true_beta0}, β1={true_beta1}, σ={true_sigma}")
# 빈도주의 OLS 비교
from scipy import stats as scipy_stats
slope_ols, intercept_ols, _, _, _ = scipy_stats.linregress(x, y)
print(f"OLS estimates: β0={intercept_ols:.3f}, β1={slope_ols:.3f}")
# 베이지안 선형 회귀
with pm.Model() as linear_model:
# 사전분포
beta0 = pm.Normal('beta0', mu=0, sigma=10)
beta1 = pm.Normal('beta1', mu=0, sigma=10)
sigma = pm.HalfNormal('sigma', sigma=5)
# 선형 예측
mu = beta0 + beta1 * x
# 가능도
y_obs = pm.Normal('y_obs', mu=mu, sigma=sigma, observed=y)
# 샘플링
trace = pm.sample(3000, tune=1000, cores=1, random_seed=42)
# 결과
print("\n=== 베이지안 추정 결과 ===")
summary = az.summary(trace, var_names=['beta0', 'beta1', 'sigma'])
print(summary)
# 시각화
fig, axes = plt.subplots(2, 3, figsize=(15, 8))
# 사후분포
az.plot_posterior(trace, var_names=['beta0'], ax=axes[0, 0])
axes[0, 0].axvline(true_beta0, color='r', linestyle='--', label='True')
axes[0, 0].axvline(intercept_ols, color='g', linestyle=':', label='OLS')
axes[0, 0].legend()
az.plot_posterior(trace, var_names=['beta1'], ax=axes[0, 1])
axes[0, 1].axvline(true_beta1, color='r', linestyle='--', label='True')
axes[0, 1].axvline(slope_ols, color='g', linestyle=':', label='OLS')
axes[0, 1].legend()
az.plot_posterior(trace, var_names=['sigma'], ax=axes[0, 2])
axes[0, 2].axvline(true_sigma, color='r', linestyle='--', label='True')
axes[0, 2].legend()
# 회귀선 불확실성
ax = axes[1, 0]
ax.scatter(x, y, alpha=0.5, s=20)
# 사후 샘플에서 회귀선
x_plot = np.linspace(0, 10, 100)
posterior_samples = trace.posterior
# 100개의 사후 샘플로 회귀선 그리기
for i in range(100):
b0 = posterior_samples['beta0'].values.flatten()[i * 30] # thin
b1 = posterior_samples['beta1'].values.flatten()[i * 30]
ax.plot(x_plot, b0 + b1 * x_plot, 'b-', alpha=0.05)
# 사후 평균 회귀선
mean_b0 = posterior_samples['beta0'].mean().values
mean_b1 = posterior_samples['beta1'].mean().values
ax.plot(x_plot, mean_b0 + mean_b1 * x_plot, 'r-', lw=2, label='Posterior mean')
ax.plot(x_plot, true_beta0 + true_beta1 * x_plot, 'g--', lw=2, label='True')
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_title('회귀선 불확실성')
ax.legend()
ax.grid(True, alpha=0.3)
# 결합 사후분포: beta0 vs beta1
ax = axes[1, 1]
az.plot_pair(trace, var_names=['beta0', 'beta1'], kind='kde', ax=ax)
ax.scatter([true_beta0], [true_beta1], color='r', s=100, marker='x', zorder=5)
ax.set_title('β0 vs β1 결합분포')
# 트레이스 플롯
ax = axes[1, 2]
az.plot_trace(trace, var_names=['beta1'], axes=np.array([[ax, ax]]))
ax.set_title('β1 트레이스')
plt.tight_layout()
plt.show()
4.3 사후 예측¶
# 사후 예측분포
with linear_model:
# 새로운 x 값에 대한 예측
x_new = np.array([3, 5, 7])
# 사후 예측 샘플
pm.set_data({'x': x_new}) # 만약 shared 변수 사용 시
# 수동으로 사후 예측
posterior = trace.posterior
beta0_samples = posterior['beta0'].values.flatten()
beta1_samples = posterior['beta1'].values.flatten()
sigma_samples = posterior['sigma'].values.flatten()
# 예측
predictions = {}
for x_val in x_new:
mu_pred = beta0_samples + beta1_samples * x_val
y_pred = np.random.normal(mu_pred, sigma_samples)
predictions[x_val] = {
'mean': mu_pred.mean(),
'std': mu_pred.std(),
'pred_mean': y_pred.mean(),
'pred_std': y_pred.std(),
'ci_95': (np.percentile(mu_pred, 2.5), np.percentile(mu_pred, 97.5)),
'pred_95': (np.percentile(y_pred, 2.5), np.percentile(y_pred, 97.5))
}
print("=== 사후 예측 ===")
for x_val, pred in predictions.items():
print(f"\nx = {x_val}:")
print(f" E[y|x] = {pred['mean']:.3f} ± {pred['std']:.3f}")
print(f" 95% CI for E[y|x]: ({pred['ci_95'][0]:.3f}, {pred['ci_95'][1]:.3f})")
print(f" 95% PI for y: ({pred['pred_95'][0]:.3f}, {pred['pred_95'][1]:.3f})")
5. 모델 비교¶
5.1 WAIC (Widely Applicable Information Criterion)¶
# 두 모델 비교: 단순 vs 이차 회귀
# 이차 데이터 생성
np.random.seed(42)
n = 100
x = np.random.uniform(0, 10, n)
y_quad = 2 + 0.5 * x + 0.1 * x**2 + np.random.normal(0, 1.5, n)
# 모델 1: 선형
with pm.Model() as model_linear:
beta0 = pm.Normal('beta0', mu=0, sigma=10)
beta1 = pm.Normal('beta1', mu=0, sigma=10)
sigma = pm.HalfNormal('sigma', sigma=5)
mu = beta0 + beta1 * x
y_obs = pm.Normal('y_obs', mu=mu, sigma=sigma, observed=y_quad)
trace_linear = pm.sample(2000, tune=1000, cores=1, random_seed=42)
# WAIC 계산을 위한 log_likelihood
pm.compute_log_likelihood(trace_linear)
# 모델 2: 이차
with pm.Model() as model_quadratic:
beta0 = pm.Normal('beta0', mu=0, sigma=10)
beta1 = pm.Normal('beta1', mu=0, sigma=10)
beta2 = pm.Normal('beta2', mu=0, sigma=10)
sigma = pm.HalfNormal('sigma', sigma=5)
mu = beta0 + beta1 * x + beta2 * x**2
y_obs = pm.Normal('y_obs', mu=mu, sigma=sigma, observed=y_quad)
trace_quadratic = pm.sample(2000, tune=1000, cores=1, random_seed=42)
pm.compute_log_likelihood(trace_quadratic)
# WAIC 비교
print("=== WAIC 비교 ===")
waic_linear = az.waic(trace_linear)
waic_quadratic = az.waic(trace_quadratic)
print(f"Linear model WAIC: {waic_linear.waic:.2f}")
print(f"Quadratic model WAIC: {waic_quadratic.waic:.2f}")
print(f"\n낮은 WAIC이 더 좋은 모델")
# 모델 비교
comparison = az.compare({
'Linear': trace_linear,
'Quadratic': trace_quadratic
}, ic='waic')
print("\n=== 모델 비교 ===")
print(comparison)
# 시각화
fig, ax = plt.subplots(figsize=(10, 5))
az.plot_compare(comparison, ax=ax)
plt.title('WAIC 모델 비교')
plt.tight_layout()
plt.show()
5.2 LOO-CV (Leave-One-Out Cross-Validation)¶
# LOO-CV (Pareto Smoothed Importance Sampling LOO)
print("=== LOO-CV 비교 ===")
loo_linear = az.loo(trace_linear)
loo_quadratic = az.loo(trace_quadratic)
print(f"Linear model LOO: {loo_linear.loo:.2f}")
print(f"Quadratic model LOO: {loo_quadratic.loo:.2f}")
# LOO 진단
print("\n=== LOO 진단: Pareto k ===")
print(f"Linear - k > 0.7인 관측치: {(loo_linear.pareto_k > 0.7).sum()}")
print(f"Quadratic - k > 0.7인 관측치: {(loo_quadratic.pareto_k > 0.7).sum()}")
# 비교
comparison_loo = az.compare({
'Linear': trace_linear,
'Quadratic': trace_quadratic
}, ic='loo')
print("\n=== LOO 모델 비교 ===")
print(comparison_loo)
5.3 베이즈 팩터 (개념)¶
def bayes_factor_example():
"""베이즈 팩터 개념 설명"""
print("=== 베이즈 팩터 (Bayes Factor) ===")
print()
print("정의: BF₁₀ = P(D|M₁) / P(D|M₀)")
print()
print("해석:")
print(" BF > 100 : 결정적 증거 (M₁ 지지)")
print(" 30 < BF < 100: 매우 강한 증거")
print(" 10 < BF < 30 : 강한 증거")
print(" 3 < BF < 10 : 중간 증거")
print(" 1 < BF < 3 : 약한 증거")
print(" BF = 1 : 증거 없음")
print(" BF < 1 : M₀ 지지")
print()
print("주의: 베이즈 팩터 계산은 일반적으로 어려움")
print(" PyMC에서는 Savage-Dickey ratio 등의 방법 사용")
bayes_factor_example()
6. 사후 예측 검사 (Posterior Predictive Checks)¶
6.1 개념¶
사후 예측 분포: 관측된 데이터를 학습한 모델이 생성할 것으로 예상되는 데이터
$$p(\tilde{y}|y) = \int p(\tilde{y}|\theta) p(\theta|y) d\theta$$
6.2 구현¶
# 포아송 회귀 예제
np.random.seed(42)
n = 100
x = np.random.uniform(0, 5, n)
true_rate = np.exp(0.5 + 0.3 * x)
y_count = np.random.poisson(true_rate)
# 잘못된 모델: 정규분포 가정
with pm.Model() as wrong_model:
beta0 = pm.Normal('beta0', mu=0, sigma=5)
beta1 = pm.Normal('beta1', mu=0, sigma=5)
sigma = pm.HalfNormal('sigma', sigma=5)
mu = beta0 + beta1 * x
y_obs = pm.Normal('y_obs', mu=mu, sigma=sigma, observed=y_count)
trace_wrong = pm.sample(2000, tune=1000, cores=1, random_seed=42)
# 사후 예측
ppc_wrong = pm.sample_posterior_predictive(trace_wrong, random_seed=42)
# 올바른 모델: 포아송 가정
with pm.Model() as correct_model:
beta0 = pm.Normal('beta0', mu=0, sigma=5)
beta1 = pm.Normal('beta1', mu=0, sigma=5)
log_rate = beta0 + beta1 * x
rate = pm.math.exp(log_rate)
y_obs = pm.Poisson('y_obs', mu=rate, observed=y_count)
trace_correct = pm.sample(2000, tune=1000, cores=1, random_seed=42)
ppc_correct = pm.sample_posterior_predictive(trace_correct, random_seed=42)
# 시각화
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 잘못된 모델의 PPC
ax = axes[0, 0]
ppc_data_wrong = ppc_wrong.posterior_predictive['y_obs'].values.flatten()
ax.hist(y_count, bins=20, density=True, alpha=0.7, label='Observed')
ax.hist(ppc_data_wrong[:len(y_count)*10], bins=20, density=True, alpha=0.5, label='PPC')
ax.set_xlabel('y')
ax.set_ylabel('Density')
ax.set_title('잘못된 모델 (Normal): PPC')
ax.legend()
# 올바른 모델의 PPC
ax = axes[0, 1]
ppc_data_correct = ppc_correct.posterior_predictive['y_obs'].values.flatten()
ax.hist(y_count, bins=20, density=True, alpha=0.7, label='Observed')
ax.hist(ppc_data_correct[:len(y_count)*10], bins=20, density=True, alpha=0.5, label='PPC')
ax.set_xlabel('y')
ax.set_ylabel('Density')
ax.set_title('올바른 모델 (Poisson): PPC')
ax.legend()
# 잔차 분석 - 잘못된 모델
ax = axes[1, 0]
mean_pred_wrong = ppc_wrong.posterior_predictive['y_obs'].mean(dim=['chain', 'draw']).values
residuals_wrong = y_count - mean_pred_wrong
ax.scatter(mean_pred_wrong, residuals_wrong, alpha=0.5)
ax.axhline(0, color='r', linestyle='--')
ax.set_xlabel('Predicted')
ax.set_ylabel('Residual')
ax.set_title('잘못된 모델: 잔차')
# 잔차 분석 - 올바른 모델
ax = axes[1, 1]
mean_pred_correct = ppc_correct.posterior_predictive['y_obs'].mean(dim=['chain', 'draw']).values
residuals_correct = y_count - mean_pred_correct
ax.scatter(mean_pred_correct, residuals_correct, alpha=0.5)
ax.axhline(0, color='r', linestyle='--')
ax.set_xlabel('Predicted')
ax.set_ylabel('Residual')
ax.set_title('올바른 모델: 잔차')
plt.tight_layout()
plt.show()
6.3 ArviZ를 사용한 PPC¶
# ArviZ PPC 플롯
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 관측 데이터를 InferenceData에 추가
idata_wrong = az.from_pymc3(trace_wrong, posterior_predictive=ppc_wrong)
idata_wrong.add_groups({'observed_data': {'y_obs': y_count}})
idata_correct = az.from_pymc3(trace_correct, posterior_predictive=ppc_correct)
idata_correct.add_groups({'observed_data': {'y_obs': y_count}})
# PPC 플롯
az.plot_ppc(idata_wrong, ax=axes[0], num_pp_samples=100)
axes[0].set_title('잘못된 모델 (Normal)')
az.plot_ppc(idata_correct, ax=axes[1], num_pp_samples=100)
axes[1].set_title('올바른 모델 (Poisson)')
plt.tight_layout()
plt.show()
7. 수렴 진단¶
7.1 R-hat (Gelman-Rubin 통계량)¶
def demonstrate_convergence_diagnostics():
"""수렴 진단 시연"""
# 여러 체인으로 샘플링
np.random.seed(42)
data = np.random.normal(5, 2, 50)
with pm.Model() as model:
mu = pm.Normal('mu', mu=0, sigma=10)
sigma = pm.HalfNormal('sigma', sigma=5)
y = pm.Normal('y', mu=mu, sigma=sigma, observed=data)
# 여러 체인
trace = pm.sample(2000, tune=1000, chains=4, cores=1, random_seed=42)
# R-hat 계산
summary = az.summary(trace)
print("=== 수렴 진단 ===")
print(summary[['mean', 'sd', 'r_hat', 'ess_bulk', 'ess_tail']])
print("\n해석:")
print(" R-hat < 1.01: 수렴 양호")
print(" ESS_bulk > 400: 벌크 ESS 충분")
print(" ESS_tail > 400: 꼬리 ESS 충분")
return trace
trace_diagnostic = demonstrate_convergence_diagnostics()
# 체인별 트레이스 플롯
fig, axes = plt.subplots(2, 2, figsize=(14, 8))
az.plot_trace(trace_diagnostic, var_names=['mu', 'sigma'], axes=axes)
plt.suptitle('다중 체인 트레이스 플롯')
plt.tight_layout()
plt.show()
7.2 자기상관 분석¶
# 자기상관 플롯
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
az.plot_autocorr(trace_diagnostic, var_names=['mu'], ax=axes[0])
axes[0].set_title('μ 자기상관')
az.plot_autocorr(trace_diagnostic, var_names=['sigma'], ax=axes[1])
axes[1].set_title('σ 자기상관')
plt.tight_layout()
plt.show()
print("자기상관이 빠르게 0으로 감소하면 좋은 혼합을 의미")
7.3 유효 표본 크기 (ESS)¶
# ESS 계산
ess = az.ess(trace_diagnostic)
print("=== 유효 표본 크기 (ESS) ===")
for var in ['mu', 'sigma']:
print(f"{var}: bulk={ess[var].values:.0f}, tail={az.ess(trace_diagnostic, method='tail')[var].values:.0f}")
print("\n권장: ESS > 400 (최소 100 이상)")
print("ESS가 낮으면 더 많은 샘플이 필요하거나 모델/샘플러 개선 필요")
8. 실습 예제¶
8.1 계층적 모델 (Hierarchical Model)¶
# 여러 학교의 학생 성적 데이터
np.random.seed(42)
n_schools = 8
n_students_per_school = np.random.randint(15, 30, n_schools)
# 진짜 학교 효과
true_school_mean = 70
true_school_std = 5
true_school_effects = np.random.normal(0, true_school_std, n_schools)
true_noise_std = 10
# 데이터 생성
schools = []
scores = []
for i in range(n_schools):
school_mean = true_school_mean + true_school_effects[i]
student_scores = np.random.normal(school_mean, true_noise_std, n_students_per_school[i])
schools.extend([i] * n_students_per_school[i])
scores.extend(student_scores)
schools = np.array(schools)
scores = np.array(scores)
print(f"학교 수: {n_schools}")
print(f"총 학생 수: {len(scores)}")
print(f"학교별 평균 점수: {[scores[schools == i].mean():.1f for i in range(n_schools)]}")
# 계층적 모델
with pm.Model() as hierarchical_model:
# 초모수 (hyperparameters)
mu_school = pm.Normal('mu_school', mu=70, sigma=20)
sigma_school = pm.HalfNormal('sigma_school', sigma=10)
# 학교 효과
school_effect = pm.Normal('school_effect', mu=0, sigma=sigma_school, shape=n_schools)
# 잔차 표준편차
sigma_residual = pm.HalfNormal('sigma_residual', sigma=10)
# 기대값
mu = mu_school + school_effect[schools]
# 가능도
y_obs = pm.Normal('y_obs', mu=mu, sigma=sigma_residual, observed=scores)
# 샘플링
trace_hier = pm.sample(3000, tune=1500, cores=1, random_seed=42)
# 결과
print("\n=== 계층적 모델 결과 ===")
print(az.summary(trace_hier, var_names=['mu_school', 'sigma_school', 'sigma_residual']))
# 학교 효과 비교
school_effects_post = trace_hier.posterior['school_effect'].mean(dim=['chain', 'draw']).values
school_effects_ci = az.hdi(trace_hier, var_names=['school_effect'])['school_effect'].values
fig, ax = plt.subplots(figsize=(10, 6))
school_ids = np.arange(n_schools)
# 사후 평균과 신용구간
ax.errorbar(school_ids, school_effects_post,
yerr=[school_effects_post - school_effects_ci[:, 0],
school_effects_ci[:, 1] - school_effects_post],
fmt='o', capsize=5, label='Posterior (hierarchical)')
# 실제 효과
ax.scatter(school_ids + 0.1, true_school_effects, marker='x', s=100,
color='r', label='True effects', zorder=5)
# 개별 학교 평균 (no pooling)
school_means = [scores[schools == i].mean() - true_school_mean for i in range(n_schools)]
ax.scatter(school_ids - 0.1, school_means, marker='s', s=60,
color='g', alpha=0.5, label='No pooling (raw)')
ax.axhline(0, color='k', linestyle='--', alpha=0.3)
ax.set_xlabel('School ID')
ax.set_ylabel('School Effect')
ax.set_title('학교 효과: 계층적 모델 vs 개별 추정')
ax.legend()
ax.grid(True, alpha=0.3)
plt.show()
print("\n수축(Shrinkage) 효과:")
print("계층적 모델은 극단적인 추정치를 전체 평균 방향으로 수축시킴")
9. 연습 문제¶
문제 1: MH 알고리즘 구현¶
다음 목표 분포에서 Metropolis-Hastings로 샘플을 추출하세요: - 목표: Gamma(5, 1) - 제안 분포: 현재 값에서 ±0.5 균등분포
문제 2: PyMC 모델링¶
다음 데이터에 대해 PyMC로 베이지안 추론을 수행하세요: - 20개 제품 중 3개 불량 - Beta(1,1) 사전분포 - 불량률의 사후 평균과 95% 신용구간 계산
문제 3: 모델 비교¶
선형 회귀와 다항 회귀(2차, 3차)를 비교하세요: - 데이터 생성: y = 2 + 3x + 0.5x² + noise - WAIC로 모델 선택 - 사후 예측 검사 수행
문제 4: 수렴 진단¶
여러 체인으로 샘플링하고 다음을 확인하세요: - R-hat이 1.01 미만인지 - ESS가 충분한지 - 트레이스 플롯이 잘 섞이는지
10. 핵심 요약¶
MCMC 핵심¶
- Metropolis-Hastings: 제안-수락/거부 반복
- 수락률: 20-50%가 적절
- Burn-in: 초기 불안정한 샘플 제거
PyMC 워크플로우¶
with pm.Model() as model:
# 1. 사전분포 정의
theta = pm.Distribution('theta', ...)
# 2. 가능도 정의
y = pm.Distribution('y', ..., observed=data)
# 3. 샘플링
trace = pm.sample(...)
# 4. 사후 예측
ppc = pm.sample_posterior_predictive(trace)
# 5. 진단 및 요약
az.summary(trace)
az.plot_trace(trace)
az.plot_ppc(...)
수렴 진단 체크리스트¶
- [ ] R-hat < 1.01
- [ ] ESS > 400
- [ ] 트레이스 플롯이 안정적
- [ ] 자기상관이 빠르게 감소
- [ ] 체인들이 잘 섞임
다음 장 미리보기¶
10장 시계열 분석 기초에서는: - 시계열 구성요소 (추세, 계절성, 잡음) - 정상성과 단위근 검정 - ACF/PACF 분석 - 시계열 분해