1"""
2λͺ¬ν
μΉ΄λ₯Όλ‘ λ°©λ² (Monte Carlo Methods) ꡬν
3- First-visit MC Prediction
4- Every-visit MC Prediction
5- MC Control (Exploring Starts)
6- On-policy MC Control (Ξ΅-greedy)
7- Off-policy MC (Importance Sampling)
8"""
9import numpy as np
10import matplotlib.pyplot as plt
11from collections import defaultdict
12import gymnasium as gym
13
14
15def calculate_returns(episode, gamma=0.99):
16 """
17 μνΌμλμμ κ° μμ μ λ¦¬ν΄ κ³μ°
18
19 Args:
20 episode: [(state, action, reward), ...] ννμ 리μ€νΈ
21 gamma: ν μΈμ¨
22
23 Returns:
24 returns: [(state, G), ...] κ° μμ μ 리ν΄
25 """
26 G = 0 # λ¦¬ν΄ μ΄κΈ°ν
27 returns = []
28
29 # μμμΌλ‘ κ³μ° (ν¨μ¨μ μΈ κ³μ°)
30 for t in range(len(episode) - 1, -1, -1):
31 state, action, reward = episode[t]
32 G = reward + gamma * G # ν μΈλ 리ν΄
33 returns.insert(0, (state, G))
34
35 return returns
36
37
38def first_visit_mc_prediction(env, policy, n_episodes=10000, gamma=0.99):
39 """
40 First-visit MC μ μ±
νκ°
41
42 Args:
43 env: Gymnasium νκ²½
44 policy: μ μ±
ν¨μ policy(state) -> action
45 n_episodes: μνΌμλ μ
46 gamma: ν μΈμ¨
47
48 Returns:
49 V: μν κ°μΉ ν¨μ
50 """
51 # κ° μνμ λ¦¬ν΄ ν©κ³Ό λ°©λ¬Έ νμ
52 returns_sum = defaultdict(float)
53 returns_count = defaultdict(int)
54 V = defaultdict(float)
55
56 for episode_num in range(n_episodes):
57 # μνΌμλ μμ±
58 episode = []
59 state, _ = env.reset()
60 done = False
61
62 while not done:
63 action = policy(state)
64 next_state, reward, terminated, truncated, _ = env.step(action)
65 episode.append((state, action, reward))
66 state = next_state
67 done = terminated or truncated
68
69 # First-visit: κ° μνμ 첫 λ°©λ¬Έ μΈλ±μ€ μ°ΎκΈ°
70 visited = set()
71 G = 0
72
73 # μμμΌλ‘ λ¦¬ν΄ κ³μ°
74 for t in range(len(episode) - 1, -1, -1):
75 state_t, action_t, reward_t = episode[t]
76
77 G = gamma * G + reward_t
78
79 # First-visit 체ν¬
80 if state_t not in visited:
81 visited.add(state_t)
82 returns_sum[state_t] += G
83 returns_count[state_t] += 1
84 V[state_t] = returns_sum[state_t] / returns_count[state_t]
85
86 if (episode_num + 1) % 2000 == 0:
87 print(f"Episode {episode_num + 1}/{n_episodes}")
88
89 return dict(V)
90
91
92def every_visit_mc_prediction(env, policy, n_episodes=10000, gamma=0.99):
93 """
94 Every-visit MC μ μ±
νκ°
95
96 λͺ¨λ λ°©λ¬Έμ μΉ΄μ΄νΈ
97 """
98 returns_sum = defaultdict(float)
99 returns_count = defaultdict(int)
100 V = defaultdict(float)
101
102 for episode_num in range(n_episodes):
103 episode = []
104 state, _ = env.reset()
105 done = False
106
107 while not done:
108 action = policy(state)
109 next_state, reward, terminated, truncated, _ = env.step(action)
110 episode.append((state, action, reward))
111 state = next_state
112 done = terminated or truncated
113
114 G = 0
115
116 # Every-visit: λͺ¨λ λ°©λ¬Έμμ μ
λ°μ΄νΈ
117 for t in range(len(episode) - 1, -1, -1):
118 state_t, action_t, reward_t = episode[t]
119
120 G = gamma * G + reward_t
121
122 # λͺ¨λ λ°©λ¬Έ μΉ΄μ΄νΈ
123 returns_sum[state_t] += G
124 returns_count[state_t] += 1
125 V[state_t] = returns_sum[state_t] / returns_count[state_t]
126
127 if (episode_num + 1) % 2000 == 0:
128 print(f"Episode {episode_num + 1}/{n_episodes}")
129
130 return dict(V)
131
132
133def epsilon_greedy_policy(Q, state, n_actions, epsilon=0.1):
134 """
135 Ξ΅-νμ νλ μ ν
136
137 Args:
138 Q: νλ κ°μΉ ν¨μ
139 state: νμ¬ μν
140 n_actions: νλ μ
141 epsilon: νν νλ₯
142
143 Returns:
144 action: μ νλ νλ
145 """
146 if np.random.random() < epsilon:
147 # νν: λλ€ νλ
148 return np.random.randint(n_actions)
149 else:
150 # νμ©: μ΅μ μ νλ
151 return np.argmax(Q[state])
152
153
154def mc_on_policy_control(env, n_episodes=100000, gamma=0.99,
155 epsilon=0.1, epsilon_decay=0.9999):
156 """
157 On-policy MC μ μ΄ (Ξ΅-greedy)
158
159 Args:
160 env: Gymnasium νκ²½
161 n_episodes: μνΌμλ μ
162 gamma: ν μΈμ¨
163 epsilon: ννμ¨
164 epsilon_decay: epsilon κ°μμ¨
165
166 Returns:
167 Q: νλ κ°μΉ ν¨μ
168 policy: νμ΅λ μ μ±
169 episode_rewards: μνΌμλλ³ λ³΄μ
170 """
171 n_actions = env.action_space.n
172
173 Q = defaultdict(lambda: np.zeros(n_actions))
174 returns_sum = defaultdict(lambda: np.zeros(n_actions))
175 returns_count = defaultdict(lambda: np.zeros(n_actions))
176
177 episode_rewards = []
178
179 print("MC On-Policy Control νμ΅ μμ...")
180 for episode_num in range(n_episodes):
181 episode = []
182 state, _ = env.reset()
183 done = False
184 total_reward = 0
185
186 # Ξ΅-greedy μ μ±
μΌλ‘ μνΌμλ μμ±
187 while not done:
188 action = epsilon_greedy_policy(Q, state, n_actions, epsilon)
189 next_state, reward, terminated, truncated, _ = env.step(action)
190
191 episode.append((state, action, reward))
192 total_reward += reward
193
194 state = next_state
195 done = terminated or truncated
196
197 episode_rewards.append(total_reward)
198
199 # Q μ
λ°μ΄νΈ (First-visit)
200 G = 0
201 visited = set()
202
203 for t in range(len(episode) - 1, -1, -1):
204 state_t, action_t, reward_t = episode[t]
205 G = gamma * G + reward_t
206
207 if (state_t, action_t) not in visited:
208 visited.add((state_t, action_t))
209 returns_sum[state_t][action_t] += G
210 returns_count[state_t][action_t] += 1
211 Q[state_t][action_t] = (returns_sum[state_t][action_t] /
212 returns_count[state_t][action_t])
213
214 # epsilon κ°μ
215 epsilon = max(0.01, epsilon * epsilon_decay)
216
217 if (episode_num + 1) % 10000 == 0:
218 avg_reward = np.mean(episode_rewards[-1000:])
219 print(f"Episode {episode_num + 1}: avg_reward = {avg_reward:.3f}, "
220 f"epsilon = {epsilon:.4f}")
221
222 # μ΅μ’
νμμ μ μ±
223 policy = {}
224 for state in Q:
225 policy[state] = np.argmax(Q[state])
226
227 return dict(Q), policy, episode_rewards
228
229
230def mc_off_policy_control(env, n_episodes=100000, gamma=0.99):
231 """
232 Off-policy MC μ μ΄ (Weighted Importance Sampling)
233
234 νλ μ μ±
: Ξ΅-greedy (νν)
235 λͺ©ν μ μ±
: greedy (νμ©)
236
237 Returns:
238 Q: νλ κ°μΉ ν¨μ
239 target_policy: λͺ©ν μ μ±
240 episode_rewards: μνΌμλλ³ λ³΄μ
241 """
242 n_actions = env.action_space.n
243
244 Q = defaultdict(lambda: np.zeros(n_actions))
245 C = defaultdict(lambda: np.zeros(n_actions)) # κ°μ€μΉ ν©
246
247 episode_rewards = []
248 epsilon = 0.1 # νλ μ μ±
μ epsilon
249
250 print("MC Off-Policy Control νμ΅ μμ...")
251 for episode_num in range(n_episodes):
252 # νλ μ μ±
(Ξ΅-greedy)μΌλ‘ μνΌμλ μμ±
253 episode = []
254 state, _ = env.reset()
255 done = False
256 total_reward = 0
257
258 while not done:
259 action = epsilon_greedy_policy(Q, state, n_actions, epsilon)
260 next_state, reward, terminated, truncated, _ = env.step(action)
261 episode.append((state, action, reward))
262 total_reward += reward
263 state = next_state
264 done = terminated or truncated
265
266 episode_rewards.append(total_reward)
267
268 G = 0
269 W = 1.0 # μ€μλ μνλ§ κ°μ€μΉ
270
271 # μμ μ²λ¦¬
272 for t in range(len(episode) - 1, -1, -1):
273 state_t, action_t, reward_t = episode[t]
274 G = gamma * G + reward_t
275
276 # κ°μ€ μ€μλ μνλ§ μ
λ°μ΄νΈ
277 C[state_t][action_t] += W
278 Q[state_t][action_t] += (W / C[state_t][action_t] *
279 (G - Q[state_t][action_t]))
280
281 # λͺ©ν μ μ±
μμμ νλ (greedy)
282 target_action = np.argmax(Q[state_t])
283
284 # νλμ΄ λͺ©ν μ μ±
κ³Ό λ€λ₯΄λ©΄ μ€λ¨
285 if action_t != target_action:
286 break
287
288 # μ€μλ λΉμ¨ μ
λ°μ΄νΈ
289 # Ο(a|s) = 1 (κ²°μ μ ), b(a|s) = (1-Ξ΅) + Ξ΅/|A| or Ξ΅/|A|
290 if action_t == target_action:
291 b_prob = (1 - epsilon) + epsilon / n_actions
292 else:
293 b_prob = epsilon / n_actions
294
295 W = W * 1.0 / b_prob
296
297 if (episode_num + 1) % 10000 == 0:
298 avg_reward = np.mean(episode_rewards[-1000:])
299 print(f"Episode {episode_num + 1}: avg_reward = {avg_reward:.3f}")
300
301 # μ΅μ’
νμμ μ μ±
302 target_policy = {}
303 for state in Q:
304 target_policy[state] = np.argmax(Q[state])
305
306 return dict(Q), target_policy, episode_rewards
307
308
309def blackjack_example():
310 """λΈλμ νκ²½μμ MC νμ΅"""
311 print("\n" + "=" * 60)
312 print("λΈλμ μμ - MC On-Policy Control")
313 print("=" * 60)
314
315 env = gym.make('Blackjack-v1', sab=True)
316
317 n_actions = env.action_space.n
318 Q = defaultdict(lambda: np.zeros(n_actions))
319 returns_sum = defaultdict(lambda: np.zeros(n_actions))
320 returns_count = defaultdict(lambda: np.zeros(n_actions))
321
322 n_episodes = 500000
323 gamma = 1.0
324 epsilon = 0.1
325
326 wins = 0
327 losses = 0
328 draws = 0
329
330 print(f"\n{n_episodes} μνΌμλ νμ΅ μ€...")
331 for ep in range(n_episodes):
332 episode = []
333 state, _ = env.reset()
334 done = False
335
336 # μνΌμλ μμ±
337 while not done:
338 action = epsilon_greedy_policy(Q, state, n_actions, epsilon)
339 next_state, reward, terminated, truncated, _ = env.step(action)
340 episode.append((state, action, reward))
341 state = next_state
342 done = terminated or truncated
343
344 # κ²°κ³Ό κΈ°λ‘
345 final_reward = episode[-1][2]
346 if final_reward == 1:
347 wins += 1
348 elif final_reward == -1:
349 losses += 1
350 else:
351 draws += 1
352
353 # Q μ
λ°μ΄νΈ
354 G = 0
355 visited = set()
356
357 for t in range(len(episode) - 1, -1, -1):
358 state_t, action_t, reward_t = episode[t]
359 G = gamma * G + reward_t
360
361 if (state_t, action_t) not in visited:
362 visited.add((state_t, action_t))
363 returns_sum[state_t][action_t] += G
364 returns_count[state_t][action_t] += 1
365 Q[state_t][action_t] = (returns_sum[state_t][action_t] /
366 returns_count[state_t][action_t])
367
368 if (ep + 1) % 100000 == 0:
369 win_rate = wins / (ep + 1)
370 print(f"Episode {ep + 1}: μΉλ₯ = {win_rate:.3f}")
371
372 env.close()
373
374 # μ΅μ’
ν΅κ³
375 print("\nνμ΅ μλ£!")
376 print(f"μ΄ μνΌμλ: {n_episodes}")
377 print(f"μΉλ¦¬: {wins} ({wins/n_episodes*100:.1f}%)")
378 print(f"ν¨λ°°: {losses} ({losses/n_episodes*100:.1f}%)")
379 print(f"무μΉλΆ: {draws} ({draws/n_episodes*100:.1f}%)")
380 print(f"νμ΅λ μν-νλ μ μ: {len(Q)}")
381
382 # μ μ±
μκ°ν
383 visualize_blackjack_policy(Q)
384
385 return Q
386
387
388def visualize_blackjack_policy(Q):
389 """λΈλμ μ μ±
μκ°ν"""
390 print("\n" + "=" * 60)
391 print("νμ΅λ λΈλμ μ μ±
")
392 print("=" * 60)
393 print("H: Hit (μΉ΄λ μΆκ°), S: Stick (ν¨ μ μ§)")
394
395 print("\n=== μ¬μ© κ°λ₯ν μμ΄μ€κ° μμ λ ===")
396 print(" λλ¬ μΉ΄λ")
397 print("ν©κ³ A 2 3 4 5 6 7 8 9 10")
398 print("-" * 50)
399
400 for player_sum in range(21, 11, -1):
401 row = f"{player_sum:2d}: "
402 for dealer in range(1, 11):
403 state = (player_sum, dealer, False)
404 if state in Q:
405 action = np.argmax(Q[state])
406 row += "H " if action == 1 else "S "
407 else:
408 row += "? "
409 print(row)
410
411 print("\n=== μ¬μ© κ°λ₯ν μμ΄μ€κ° μμ λ ===")
412 print(" λλ¬ μΉ΄λ")
413 print("ν©κ³ A 2 3 4 5 6 7 8 9 10")
414 print("-" * 50)
415
416 for player_sum in range(21, 11, -1):
417 row = f"{player_sum:2d}: "
418 for dealer in range(1, 11):
419 state = (player_sum, dealer, True)
420 if state in Q:
421 action = np.argmax(Q[state])
422 row += "H " if action == 1 else "S "
423 else:
424 row += "? "
425 print(row)
426
427
428def plot_learning_curve(episode_rewards, window=1000):
429 """νμ΅ κ³‘μ μκ°ν"""
430 # μ΄λ νκ· κ³μ°
431 moving_avg = []
432 for i in range(len(episode_rewards)):
433 start = max(0, i - window + 1)
434 moving_avg.append(np.mean(episode_rewards[start:i+1]))
435
436 plt.figure(figsize=(12, 6))
437 plt.plot(moving_avg, label=f'Moving Average (window={window})')
438 plt.xlabel('Episode')
439 plt.ylabel('Average Reward')
440 plt.title('Monte Carlo Learning Curve')
441 plt.legend()
442 plt.grid(True)
443 plt.tight_layout()
444 plt.savefig('mc_learning_curve.png', dpi=150)
445 print("νμ΅ κ³‘μ μ μ₯: mc_learning_curve.png")
446
447
448def compare_mc_methods():
449 """MC λ°©λ² λΉκ΅"""
450 print("=" * 60)
451 print("λͺ¬ν
μΉ΄λ₯Όλ‘ λ°©λ² λΉκ΅")
452 print("=" * 60)
453
454 env = gym.make('Blackjack-v1', sab=True)
455
456 # 1. First-visit MC Prediction
457 print("\n[1] First-visit MC Prediction (λλ€ μ μ±
)")
458 print("-" * 60)
459
460 def random_policy(state):
461 return env.action_space.sample()
462
463 V_first = first_visit_mc_prediction(env, random_policy, n_episodes=10000)
464 print(f"μΆμ λ μν μ: {len(V_first)}")
465 print(f"μν μν κ°μΉ: {list(V_first.items())[:5]}")
466
467 # 2. Every-visit MC Prediction
468 print("\n[2] Every-visit MC Prediction (λλ€ μ μ±
)")
469 print("-" * 60)
470 V_every = every_visit_mc_prediction(env, random_policy, n_episodes=10000)
471 print(f"μΆμ λ μν μ: {len(V_every)}")
472
473 # 3. On-policy MC Control
474 print("\n[3] On-policy MC Control (Ξ΅-greedy)")
475 print("-" * 60)
476 Q_on, policy_on, rewards_on = mc_on_policy_control(
477 env, n_episodes=50000, gamma=1.0, epsilon=0.1
478 )
479 print(f"νμ΅λ μν-νλ μ μ: {len(Q_on)}")
480 print(f"μ΅μ’
νκ· λ³΄μ: {np.mean(rewards_on[-1000:]):.3f}")
481
482 # 4. Off-policy MC Control
483 print("\n[4] Off-policy MC Control (Importance Sampling)")
484 print("-" * 60)
485 Q_off, policy_off, rewards_off = mc_off_policy_control(
486 env, n_episodes=50000, gamma=1.0
487 )
488 print(f"νμ΅λ μν-νλ μ μ: {len(Q_off)}")
489 print(f"μ΅μ’
νκ· λ³΄μ: {np.mean(rewards_off[-1000:]):.3f}")
490
491 env.close()
492
493 # νμ΅ κ³‘μ λΉκ΅
494 plt.figure(figsize=(12, 6))
495
496 window = 1000
497 moving_avg_on = [np.mean(rewards_on[max(0, i-window+1):i+1])
498 for i in range(len(rewards_on))]
499 moving_avg_off = [np.mean(rewards_off[max(0, i-window+1):i+1])
500 for i in range(len(rewards_off))]
501
502 plt.plot(moving_avg_on, label='On-policy MC', alpha=0.7)
503 plt.plot(moving_avg_off, label='Off-policy MC', alpha=0.7)
504 plt.xlabel('Episode')
505 plt.ylabel('Average Reward')
506 plt.title('On-policy vs Off-policy MC Control')
507 plt.legend()
508 plt.grid(True)
509 plt.tight_layout()
510 plt.savefig('mc_comparison.png', dpi=150)
511 print("\nλΉκ΅ κ·Έλν μ μ₯: mc_comparison.png")
512
513 return Q_on, policy_on, Q_off, policy_off
514
515
516if __name__ == "__main__":
517 # MC λ°©λ² λΉκ΅
518 try:
519 Q_on, policy_on, Q_off, policy_off = compare_mc_methods()
520
521 # λΈλμ μμ
522 Q_blackjack = blackjack_example()
523
524 except Exception as e:
525 print(f"\nμ€ν μ€ν¨: {e}")
526 print("gymnasium ν¨ν€μ§κ° μ€μΉλμ΄ μλμ§ νμΈνμΈμ: pip install gymnasium")
527
528 print("\n" + "=" * 60)
529 print("λͺ¬ν
μΉ΄λ₯Όλ‘ λ°©λ² μμ μλ£!")
530 print("=" * 60)