04_monte_carlo.py

Download
python 531 lines 15.7 KB
  1"""
  2λͺ¬ν…ŒμΉ΄λ₯Όλ‘œ 방법 (Monte Carlo Methods) κ΅¬ν˜„
  3- First-visit MC Prediction
  4- Every-visit MC Prediction
  5- MC Control (Exploring Starts)
  6- On-policy MC Control (Ξ΅-greedy)
  7- Off-policy MC (Importance Sampling)
  8"""
  9import numpy as np
 10import matplotlib.pyplot as plt
 11from collections import defaultdict
 12import gymnasium as gym
 13
 14
 15def calculate_returns(episode, gamma=0.99):
 16    """
 17    μ—ν”Όμ†Œλ“œμ—μ„œ 각 μ‹œμ μ˜ 리턴 계산
 18
 19    Args:
 20        episode: [(state, action, reward), ...] ν˜•νƒœμ˜ 리슀트
 21        gamma: ν• μΈμœ¨
 22
 23    Returns:
 24        returns: [(state, G), ...] 각 μ‹œμ μ˜ 리턴
 25    """
 26    G = 0  # 리턴 μ΄ˆκΈ°ν™”
 27    returns = []
 28
 29    # μ—­μˆœμœΌλ‘œ 계산 (효율적인 계산)
 30    for t in range(len(episode) - 1, -1, -1):
 31        state, action, reward = episode[t]
 32        G = reward + gamma * G  # ν• μΈλœ 리턴
 33        returns.insert(0, (state, G))
 34
 35    return returns
 36
 37
 38def first_visit_mc_prediction(env, policy, n_episodes=10000, gamma=0.99):
 39    """
 40    First-visit MC μ •μ±… 평가
 41
 42    Args:
 43        env: Gymnasium ν™˜κ²½
 44        policy: μ •μ±… ν•¨μˆ˜ policy(state) -> action
 45        n_episodes: μ—ν”Όμ†Œλ“œ 수
 46        gamma: ν• μΈμœ¨
 47
 48    Returns:
 49        V: μƒνƒœ κ°€μΉ˜ ν•¨μˆ˜
 50    """
 51    # 각 μƒνƒœμ˜ 리턴 ν•©κ³Ό λ°©λ¬Έ 횟수
 52    returns_sum = defaultdict(float)
 53    returns_count = defaultdict(int)
 54    V = defaultdict(float)
 55
 56    for episode_num in range(n_episodes):
 57        # μ—ν”Όμ†Œλ“œ 생성
 58        episode = []
 59        state, _ = env.reset()
 60        done = False
 61
 62        while not done:
 63            action = policy(state)
 64            next_state, reward, terminated, truncated, _ = env.step(action)
 65            episode.append((state, action, reward))
 66            state = next_state
 67            done = terminated or truncated
 68
 69        # First-visit: 각 μƒνƒœμ˜ 첫 λ°©λ¬Έ 인덱슀 μ°ΎκΈ°
 70        visited = set()
 71        G = 0
 72
 73        # μ—­μˆœμœΌλ‘œ 리턴 계산
 74        for t in range(len(episode) - 1, -1, -1):
 75            state_t, action_t, reward_t = episode[t]
 76
 77            G = gamma * G + reward_t
 78
 79            # First-visit 체크
 80            if state_t not in visited:
 81                visited.add(state_t)
 82                returns_sum[state_t] += G
 83                returns_count[state_t] += 1
 84                V[state_t] = returns_sum[state_t] / returns_count[state_t]
 85
 86        if (episode_num + 1) % 2000 == 0:
 87            print(f"Episode {episode_num + 1}/{n_episodes}")
 88
 89    return dict(V)
 90
 91
 92def every_visit_mc_prediction(env, policy, n_episodes=10000, gamma=0.99):
 93    """
 94    Every-visit MC μ •μ±… 평가
 95
 96    λͺ¨λ“  방문을 카운트
 97    """
 98    returns_sum = defaultdict(float)
 99    returns_count = defaultdict(int)
100    V = defaultdict(float)
101
102    for episode_num in range(n_episodes):
103        episode = []
104        state, _ = env.reset()
105        done = False
106
107        while not done:
108            action = policy(state)
109            next_state, reward, terminated, truncated, _ = env.step(action)
110            episode.append((state, action, reward))
111            state = next_state
112            done = terminated or truncated
113
114        G = 0
115
116        # Every-visit: λͺ¨λ“  λ°©λ¬Έμ—μ„œ μ—…λ°μ΄νŠΈ
117        for t in range(len(episode) - 1, -1, -1):
118            state_t, action_t, reward_t = episode[t]
119
120            G = gamma * G + reward_t
121
122            # λͺ¨λ“  λ°©λ¬Έ 카운트
123            returns_sum[state_t] += G
124            returns_count[state_t] += 1
125            V[state_t] = returns_sum[state_t] / returns_count[state_t]
126
127        if (episode_num + 1) % 2000 == 0:
128            print(f"Episode {episode_num + 1}/{n_episodes}")
129
130    return dict(V)
131
132
133def epsilon_greedy_policy(Q, state, n_actions, epsilon=0.1):
134    """
135    Ξ΅-νƒμš• 행동 선택
136
137    Args:
138        Q: 행동 κ°€μΉ˜ ν•¨μˆ˜
139        state: ν˜„μž¬ μƒνƒœ
140        n_actions: 행동 수
141        epsilon: νƒν—˜ ν™•λ₯ 
142
143    Returns:
144        action: μ„ νƒλœ 행동
145    """
146    if np.random.random() < epsilon:
147        # νƒν—˜: 랜덀 행동
148        return np.random.randint(n_actions)
149    else:
150        # ν™œμš©: μ΅œμ„ μ˜ 행동
151        return np.argmax(Q[state])
152
153
154def mc_on_policy_control(env, n_episodes=100000, gamma=0.99,
155                         epsilon=0.1, epsilon_decay=0.9999):
156    """
157    On-policy MC μ œμ–΄ (Ξ΅-greedy)
158
159    Args:
160        env: Gymnasium ν™˜κ²½
161        n_episodes: μ—ν”Όμ†Œλ“œ 수
162        gamma: ν• μΈμœ¨
163        epsilon: νƒν—˜μœ¨
164        epsilon_decay: epsilon κ°μ†Œμœ¨
165
166    Returns:
167        Q: 행동 κ°€μΉ˜ ν•¨μˆ˜
168        policy: ν•™μŠ΅λœ μ •μ±…
169        episode_rewards: μ—ν”Όμ†Œλ“œλ³„ 보상
170    """
171    n_actions = env.action_space.n
172
173    Q = defaultdict(lambda: np.zeros(n_actions))
174    returns_sum = defaultdict(lambda: np.zeros(n_actions))
175    returns_count = defaultdict(lambda: np.zeros(n_actions))
176
177    episode_rewards = []
178
179    print("MC On-Policy Control ν•™μŠ΅ μ‹œμž‘...")
180    for episode_num in range(n_episodes):
181        episode = []
182        state, _ = env.reset()
183        done = False
184        total_reward = 0
185
186        # Ξ΅-greedy μ •μ±…μœΌλ‘œ μ—ν”Όμ†Œλ“œ 생성
187        while not done:
188            action = epsilon_greedy_policy(Q, state, n_actions, epsilon)
189            next_state, reward, terminated, truncated, _ = env.step(action)
190
191            episode.append((state, action, reward))
192            total_reward += reward
193
194            state = next_state
195            done = terminated or truncated
196
197        episode_rewards.append(total_reward)
198
199        # Q μ—…λ°μ΄νŠΈ (First-visit)
200        G = 0
201        visited = set()
202
203        for t in range(len(episode) - 1, -1, -1):
204            state_t, action_t, reward_t = episode[t]
205            G = gamma * G + reward_t
206
207            if (state_t, action_t) not in visited:
208                visited.add((state_t, action_t))
209                returns_sum[state_t][action_t] += G
210                returns_count[state_t][action_t] += 1
211                Q[state_t][action_t] = (returns_sum[state_t][action_t] /
212                                        returns_count[state_t][action_t])
213
214        # epsilon κ°μ†Œ
215        epsilon = max(0.01, epsilon * epsilon_decay)
216
217        if (episode_num + 1) % 10000 == 0:
218            avg_reward = np.mean(episode_rewards[-1000:])
219            print(f"Episode {episode_num + 1}: avg_reward = {avg_reward:.3f}, "
220                  f"epsilon = {epsilon:.4f}")
221
222    # μ΅œμ’… νƒμš•μ  μ •μ±…
223    policy = {}
224    for state in Q:
225        policy[state] = np.argmax(Q[state])
226
227    return dict(Q), policy, episode_rewards
228
229
230def mc_off_policy_control(env, n_episodes=100000, gamma=0.99):
231    """
232    Off-policy MC μ œμ–΄ (Weighted Importance Sampling)
233
234    행동 μ •μ±…: Ξ΅-greedy (νƒν—˜)
235    λͺ©ν‘œ μ •μ±…: greedy (ν™œμš©)
236
237    Returns:
238        Q: 행동 κ°€μΉ˜ ν•¨μˆ˜
239        target_policy: λͺ©ν‘œ μ •μ±…
240        episode_rewards: μ—ν”Όμ†Œλ“œλ³„ 보상
241    """
242    n_actions = env.action_space.n
243
244    Q = defaultdict(lambda: np.zeros(n_actions))
245    C = defaultdict(lambda: np.zeros(n_actions))  # κ°€μ€‘μΉ˜ ν•©
246
247    episode_rewards = []
248    epsilon = 0.1  # 행동 μ •μ±…μ˜ epsilon
249
250    print("MC Off-Policy Control ν•™μŠ΅ μ‹œμž‘...")
251    for episode_num in range(n_episodes):
252        # 행동 μ •μ±… (Ξ΅-greedy)으둜 μ—ν”Όμ†Œλ“œ 생성
253        episode = []
254        state, _ = env.reset()
255        done = False
256        total_reward = 0
257
258        while not done:
259            action = epsilon_greedy_policy(Q, state, n_actions, epsilon)
260            next_state, reward, terminated, truncated, _ = env.step(action)
261            episode.append((state, action, reward))
262            total_reward += reward
263            state = next_state
264            done = terminated or truncated
265
266        episode_rewards.append(total_reward)
267
268        G = 0
269        W = 1.0  # μ€‘μš”λ„ μƒ˜ν”Œλ§ κ°€μ€‘μΉ˜
270
271        # μ—­μˆœ 처리
272        for t in range(len(episode) - 1, -1, -1):
273            state_t, action_t, reward_t = episode[t]
274            G = gamma * G + reward_t
275
276            # 가쀑 μ€‘μš”λ„ μƒ˜ν”Œλ§ μ—…λ°μ΄νŠΈ
277            C[state_t][action_t] += W
278            Q[state_t][action_t] += (W / C[state_t][action_t] *
279                                     (G - Q[state_t][action_t]))
280
281            # λͺ©ν‘œ μ •μ±…μ—μ„œμ˜ 행동 (greedy)
282            target_action = np.argmax(Q[state_t])
283
284            # 행동이 λͺ©ν‘œ μ •μ±…κ³Ό λ‹€λ₯΄λ©΄ 쀑단
285            if action_t != target_action:
286                break
287
288            # μ€‘μš”λ„ λΉ„μœ¨ μ—…λ°μ΄νŠΈ
289            # Ο€(a|s) = 1 (결정적), b(a|s) = (1-Ξ΅) + Ξ΅/|A| or Ξ΅/|A|
290            if action_t == target_action:
291                b_prob = (1 - epsilon) + epsilon / n_actions
292            else:
293                b_prob = epsilon / n_actions
294
295            W = W * 1.0 / b_prob
296
297        if (episode_num + 1) % 10000 == 0:
298            avg_reward = np.mean(episode_rewards[-1000:])
299            print(f"Episode {episode_num + 1}: avg_reward = {avg_reward:.3f}")
300
301    # μ΅œμ’… νƒμš•μ  μ •μ±…
302    target_policy = {}
303    for state in Q:
304        target_policy[state] = np.argmax(Q[state])
305
306    return dict(Q), target_policy, episode_rewards
307
308
309def blackjack_example():
310    """λΈ”λž™μž­ ν™˜κ²½μ—μ„œ MC ν•™μŠ΅"""
311    print("\n" + "=" * 60)
312    print("λΈ”λž™μž­ 예제 - MC On-Policy Control")
313    print("=" * 60)
314
315    env = gym.make('Blackjack-v1', sab=True)
316
317    n_actions = env.action_space.n
318    Q = defaultdict(lambda: np.zeros(n_actions))
319    returns_sum = defaultdict(lambda: np.zeros(n_actions))
320    returns_count = defaultdict(lambda: np.zeros(n_actions))
321
322    n_episodes = 500000
323    gamma = 1.0
324    epsilon = 0.1
325
326    wins = 0
327    losses = 0
328    draws = 0
329
330    print(f"\n{n_episodes} μ—ν”Όμ†Œλ“œ ν•™μŠ΅ 쀑...")
331    for ep in range(n_episodes):
332        episode = []
333        state, _ = env.reset()
334        done = False
335
336        # μ—ν”Όμ†Œλ“œ 생성
337        while not done:
338            action = epsilon_greedy_policy(Q, state, n_actions, epsilon)
339            next_state, reward, terminated, truncated, _ = env.step(action)
340            episode.append((state, action, reward))
341            state = next_state
342            done = terminated or truncated
343
344        # 결과 기둝
345        final_reward = episode[-1][2]
346        if final_reward == 1:
347            wins += 1
348        elif final_reward == -1:
349            losses += 1
350        else:
351            draws += 1
352
353        # Q μ—…λ°μ΄νŠΈ
354        G = 0
355        visited = set()
356
357        for t in range(len(episode) - 1, -1, -1):
358            state_t, action_t, reward_t = episode[t]
359            G = gamma * G + reward_t
360
361            if (state_t, action_t) not in visited:
362                visited.add((state_t, action_t))
363                returns_sum[state_t][action_t] += G
364                returns_count[state_t][action_t] += 1
365                Q[state_t][action_t] = (returns_sum[state_t][action_t] /
366                                        returns_count[state_t][action_t])
367
368        if (ep + 1) % 100000 == 0:
369            win_rate = wins / (ep + 1)
370            print(f"Episode {ep + 1}: 승λ₯  = {win_rate:.3f}")
371
372    env.close()
373
374    # μ΅œμ’… 톡계
375    print("\nν•™μŠ΅ μ™„λ£Œ!")
376    print(f"총 μ—ν”Όμ†Œλ“œ: {n_episodes}")
377    print(f"승리: {wins} ({wins/n_episodes*100:.1f}%)")
378    print(f"패배: {losses} ({losses/n_episodes*100:.1f}%)")
379    print(f"λ¬΄μŠΉλΆ€: {draws} ({draws/n_episodes*100:.1f}%)")
380    print(f"ν•™μŠ΅λœ μƒνƒœ-행동 쌍 수: {len(Q)}")
381
382    # μ •μ±… μ‹œκ°ν™”
383    visualize_blackjack_policy(Q)
384
385    return Q
386
387
388def visualize_blackjack_policy(Q):
389    """λΈ”λž™μž­ μ •μ±… μ‹œκ°ν™”"""
390    print("\n" + "=" * 60)
391    print("ν•™μŠ΅λœ λΈ”λž™μž­ μ •μ±…")
392    print("=" * 60)
393    print("H: Hit (μΉ΄λ“œ μΆ”κ°€), S: Stick (패 μœ μ§€)")
394
395    print("\n=== μ‚¬μš© κ°€λŠ₯ν•œ μ—μ΄μŠ€κ°€ 없을 λ•Œ ===")
396    print("       λ”œλŸ¬ μΉ΄λ“œ")
397    print("합계   A  2  3  4  5  6  7  8  9  10")
398    print("-" * 50)
399
400    for player_sum in range(21, 11, -1):
401        row = f"{player_sum:2d}:   "
402        for dealer in range(1, 11):
403            state = (player_sum, dealer, False)
404            if state in Q:
405                action = np.argmax(Q[state])
406                row += "H  " if action == 1 else "S  "
407            else:
408                row += "?  "
409        print(row)
410
411    print("\n=== μ‚¬μš© κ°€λŠ₯ν•œ μ—μ΄μŠ€κ°€ μžˆμ„ λ•Œ ===")
412    print("       λ”œλŸ¬ μΉ΄λ“œ")
413    print("합계   A  2  3  4  5  6  7  8  9  10")
414    print("-" * 50)
415
416    for player_sum in range(21, 11, -1):
417        row = f"{player_sum:2d}:   "
418        for dealer in range(1, 11):
419            state = (player_sum, dealer, True)
420            if state in Q:
421                action = np.argmax(Q[state])
422                row += "H  " if action == 1 else "S  "
423            else:
424                row += "?  "
425        print(row)
426
427
428def plot_learning_curve(episode_rewards, window=1000):
429    """ν•™μŠ΅ 곑선 μ‹œκ°ν™”"""
430    # 이동 평균 계산
431    moving_avg = []
432    for i in range(len(episode_rewards)):
433        start = max(0, i - window + 1)
434        moving_avg.append(np.mean(episode_rewards[start:i+1]))
435
436    plt.figure(figsize=(12, 6))
437    plt.plot(moving_avg, label=f'Moving Average (window={window})')
438    plt.xlabel('Episode')
439    plt.ylabel('Average Reward')
440    plt.title('Monte Carlo Learning Curve')
441    plt.legend()
442    plt.grid(True)
443    plt.tight_layout()
444    plt.savefig('mc_learning_curve.png', dpi=150)
445    print("ν•™μŠ΅ 곑선 μ €μž₯: mc_learning_curve.png")
446
447
448def compare_mc_methods():
449    """MC 방법 비ꡐ"""
450    print("=" * 60)
451    print("λͺ¬ν…ŒμΉ΄λ₯Όλ‘œ 방법 비ꡐ")
452    print("=" * 60)
453
454    env = gym.make('Blackjack-v1', sab=True)
455
456    # 1. First-visit MC Prediction
457    print("\n[1] First-visit MC Prediction (랜덀 μ •μ±…)")
458    print("-" * 60)
459
460    def random_policy(state):
461        return env.action_space.sample()
462
463    V_first = first_visit_mc_prediction(env, random_policy, n_episodes=10000)
464    print(f"μΆ”μ •λœ μƒνƒœ 수: {len(V_first)}")
465    print(f"μƒ˜ν”Œ μƒνƒœ κ°€μΉ˜: {list(V_first.items())[:5]}")
466
467    # 2. Every-visit MC Prediction
468    print("\n[2] Every-visit MC Prediction (랜덀 μ •μ±…)")
469    print("-" * 60)
470    V_every = every_visit_mc_prediction(env, random_policy, n_episodes=10000)
471    print(f"μΆ”μ •λœ μƒνƒœ 수: {len(V_every)}")
472
473    # 3. On-policy MC Control
474    print("\n[3] On-policy MC Control (Ξ΅-greedy)")
475    print("-" * 60)
476    Q_on, policy_on, rewards_on = mc_on_policy_control(
477        env, n_episodes=50000, gamma=1.0, epsilon=0.1
478    )
479    print(f"ν•™μŠ΅λœ μƒνƒœ-행동 쌍 수: {len(Q_on)}")
480    print(f"μ΅œμ’… 평균 보상: {np.mean(rewards_on[-1000:]):.3f}")
481
482    # 4. Off-policy MC Control
483    print("\n[4] Off-policy MC Control (Importance Sampling)")
484    print("-" * 60)
485    Q_off, policy_off, rewards_off = mc_off_policy_control(
486        env, n_episodes=50000, gamma=1.0
487    )
488    print(f"ν•™μŠ΅λœ μƒνƒœ-행동 쌍 수: {len(Q_off)}")
489    print(f"μ΅œμ’… 평균 보상: {np.mean(rewards_off[-1000:]):.3f}")
490
491    env.close()
492
493    # ν•™μŠ΅ 곑선 비ꡐ
494    plt.figure(figsize=(12, 6))
495
496    window = 1000
497    moving_avg_on = [np.mean(rewards_on[max(0, i-window+1):i+1])
498                     for i in range(len(rewards_on))]
499    moving_avg_off = [np.mean(rewards_off[max(0, i-window+1):i+1])
500                      for i in range(len(rewards_off))]
501
502    plt.plot(moving_avg_on, label='On-policy MC', alpha=0.7)
503    plt.plot(moving_avg_off, label='Off-policy MC', alpha=0.7)
504    plt.xlabel('Episode')
505    plt.ylabel('Average Reward')
506    plt.title('On-policy vs Off-policy MC Control')
507    plt.legend()
508    plt.grid(True)
509    plt.tight_layout()
510    plt.savefig('mc_comparison.png', dpi=150)
511    print("\n비ꡐ κ·Έλž˜ν”„ μ €μž₯: mc_comparison.png")
512
513    return Q_on, policy_on, Q_off, policy_off
514
515
516if __name__ == "__main__":
517    # MC 방법 비ꡐ
518    try:
519        Q_on, policy_on, Q_off, policy_off = compare_mc_methods()
520
521        # λΈ”λž™μž­ 예제
522        Q_blackjack = blackjack_example()
523
524    except Exception as e:
525        print(f"\nμ‹€ν–‰ μ‹€νŒ¨: {e}")
526        print("gymnasium νŒ¨ν‚€μ§€κ°€ μ„€μΉ˜λ˜μ–΄ μžˆλŠ”μ§€ ν™•μΈν•˜μ„Έμš”: pip install gymnasium")
527
528    print("\n" + "=" * 60)
529    print("λͺ¬ν…ŒμΉ΄λ₯Όλ‘œ 방법 예제 μ™„λ£Œ!")
530    print("=" * 60)