1"""
2NumPy๋ก ๊ตฌํํ LeNet-5
3
4์๋ณธ ๋
ผ๋ฌธ: LeCun et al. (1998)
5"Gradient-Based Learning Applied to Document Recognition"
6"""
7
8import numpy as np
9from typing import Tuple, List
10from conv_numpy import Conv2dNumpy, im2col, col2im
11
12
13class AvgPool2dNumpy:
14 """Average Pooling Layer"""
15
16 def __init__(self, kernel_size: int = 2, stride: int = 2):
17 self.kernel_size = kernel_size
18 self.stride = stride
19 self.cache = {}
20
21 def forward(self, input: np.ndarray) -> np.ndarray:
22 """Forward pass"""
23 N, C, H, W = input.shape
24 K = self.kernel_size
25 S = self.stride
26
27 H_out = (H - K) // S + 1
28 W_out = (W - K) // S + 1
29
30 # im2col ๋ณํ
31 col = im2col(input, (K, K), S, padding=0)
32 col = col.reshape(N, C, K * K, H_out * W_out)
33
34 # ํ๊ท
35 output = np.mean(col, axis=2)
36 output = output.reshape(N, C, H_out, W_out)
37
38 # ์บ์
39 self.cache['input_shape'] = input.shape
40
41 return output
42
43 def backward(self, grad_output: np.ndarray) -> np.ndarray:
44 """Backward pass"""
45 N, C, H_out, W_out = grad_output.shape
46 input_shape = self.cache['input_shape']
47 K = self.kernel_size
48
49 # ๊ฐ ์์์ 1/(K*K) ๋งํผ ๋ถ๋ฐฐ
50 grad_output_expanded = grad_output.reshape(N, C, 1, H_out * W_out)
51 grad_col = np.repeat(grad_output_expanded, K * K, axis=2) / (K * K)
52 grad_col = grad_col.reshape(N, C * K * K, H_out * W_out)
53
54 grad_input = col2im(
55 grad_col, input_shape, (K, K),
56 self.stride, padding=0
57 )
58
59 return grad_input
60
61
62class MaxPool2dNumpy:
63 """Max Pooling Layer"""
64
65 def __init__(self, kernel_size: int = 2, stride: int = 2):
66 self.kernel_size = kernel_size
67 self.stride = stride
68 self.cache = {}
69
70 def forward(self, input: np.ndarray) -> np.ndarray:
71 """Forward pass"""
72 N, C, H, W = input.shape
73 K = self.kernel_size
74 S = self.stride
75
76 H_out = (H - K) // S + 1
77 W_out = (W - K) // S + 1
78
79 # im2col
80 col = im2col(input, (K, K), S, padding=0)
81 col = col.reshape(N, C, K * K, H_out * W_out)
82
83 # Max
84 max_idx = np.argmax(col, axis=2)
85 output = np.max(col, axis=2)
86 output = output.reshape(N, C, H_out, W_out)
87
88 # ์บ์
89 self.cache['input_shape'] = input.shape
90 self.cache['max_idx'] = max_idx
91 self.cache['col_shape'] = (N, C, K * K, H_out * W_out)
92
93 return output
94
95 def backward(self, grad_output: np.ndarray) -> np.ndarray:
96 """Backward pass"""
97 N, C, H_out, W_out = grad_output.shape
98 input_shape = self.cache['input_shape']
99 max_idx = self.cache['max_idx']
100 K = self.kernel_size
101
102 # Max ์์น์๋ง gradient ์ ๋ฌ
103 grad_col = np.zeros((N, C, K * K, H_out * W_out))
104
105 for n in range(N):
106 for c in range(C):
107 for h in range(H_out):
108 for w in range(W_out):
109 idx = max_idx[n, c, h * W_out + w]
110 grad_col[n, c, idx, h * W_out + w] = grad_output[n, c, h, w]
111
112 grad_col = grad_col.reshape(N, C * K * K, H_out * W_out)
113
114 grad_input = col2im(
115 grad_col, input_shape, (K, K),
116 self.stride, padding=0
117 )
118
119 return grad_input
120
121
122class FlattenNumpy:
123 """Flatten Layer"""
124
125 def __init__(self):
126 self.cache = {}
127
128 def forward(self, input: np.ndarray) -> np.ndarray:
129 self.cache['input_shape'] = input.shape
130 return input.reshape(input.shape[0], -1)
131
132 def backward(self, grad_output: np.ndarray) -> np.ndarray:
133 return grad_output.reshape(self.cache['input_shape'])
134
135
136class LinearNumpy:
137 """Fully Connected Layer"""
138
139 def __init__(self, in_features: int, out_features: int):
140 # Xavier ์ด๊ธฐํ
141 scale = np.sqrt(2.0 / in_features)
142 self.weight = np.random.randn(out_features, in_features) * scale
143 self.bias = np.zeros(out_features)
144
145 self.weight_grad = None
146 self.bias_grad = None
147 self.cache = {}
148
149 def forward(self, input: np.ndarray) -> np.ndarray:
150 """Y = XW^T + b"""
151 self.cache['input'] = input
152 return input @ self.weight.T + self.bias
153
154 def backward(self, grad_output: np.ndarray) -> np.ndarray:
155 """Backward pass"""
156 input = self.cache['input']
157
158 # Gradients
159 self.weight_grad = grad_output.T @ input
160 self.bias_grad = np.sum(grad_output, axis=0)
161
162 # Input gradient
163 grad_input = grad_output @ self.weight
164
165 return grad_input
166
167 def update(self, lr: float):
168 self.weight -= lr * self.weight_grad
169 self.bias -= lr * self.bias_grad
170
171
172class TanhNumpy:
173 """Tanh Activation"""
174
175 def __init__(self):
176 self.cache = {}
177
178 def forward(self, input: np.ndarray) -> np.ndarray:
179 output = np.tanh(input)
180 self.cache['output'] = output
181 return output
182
183 def backward(self, grad_output: np.ndarray) -> np.ndarray:
184 output = self.cache['output']
185 return grad_output * (1 - output ** 2)
186
187
188class ReLUNumpy:
189 """ReLU Activation"""
190
191 def __init__(self):
192 self.cache = {}
193
194 def forward(self, input: np.ndarray) -> np.ndarray:
195 self.cache['input'] = input
196 return np.maximum(0, input)
197
198 def backward(self, grad_output: np.ndarray) -> np.ndarray:
199 input = self.cache['input']
200 return grad_output * (input > 0)
201
202
203class SoftmaxCrossEntropyNumpy:
204 """Softmax + Cross Entropy Loss"""
205
206 def __init__(self):
207 self.cache = {}
208
209 def forward(self, logits: np.ndarray, labels: np.ndarray) -> float:
210 """
211 Args:
212 logits: (N, num_classes)
213 labels: (N,) - ํด๋์ค ์ธ๋ฑ์ค
214
215 Returns:
216 loss: scalar
217 """
218 N = logits.shape[0]
219
220 # Softmax (์์น ์์ ์ฑ)
221 shifted = logits - np.max(logits, axis=1, keepdims=True)
222 exp_scores = np.exp(shifted)
223 probs = exp_scores / np.sum(exp_scores, axis=1, keepdims=True)
224
225 # Cross entropy
226 correct_probs = probs[np.arange(N), labels]
227 loss = -np.mean(np.log(correct_probs + 1e-10))
228
229 # ์บ์
230 self.cache['probs'] = probs
231 self.cache['labels'] = labels
232
233 return loss
234
235 def backward(self) -> np.ndarray:
236 """Gradient: softmax(x) - one_hot(y)"""
237 probs = self.cache['probs']
238 labels = self.cache['labels']
239 N = probs.shape[0]
240
241 grad = probs.copy()
242 grad[np.arange(N), labels] -= 1
243 grad /= N
244
245 return grad
246
247
248class LeNet5Numpy:
249 """
250 LeNet-5 NumPy ๊ตฌํ
251
252 ์ํคํ
์ฒ:
253 Input (1, 32, 32)
254 โ Conv1 (6, 5, 5) โ Tanh โ AvgPool
255 โ Conv2 (16, 5, 5) โ Tanh โ AvgPool
256 โ Conv3 (120, 5, 5) โ Tanh
257 โ FC1 (120 โ 84) โ Tanh
258 โ FC2 (84 โ 10)
259 """
260
261 def __init__(self, num_classes: int = 10, use_relu: bool = False):
262 """
263 Args:
264 num_classes: ์ถ๋ ฅ ํด๋์ค ์
265 use_relu: True๋ฉด ReLU, False๋ฉด Tanh (์๋ณธ)
266 """
267 Activation = ReLUNumpy if use_relu else TanhNumpy
268
269 # Layer 1: Conv + Pool
270 self.conv1 = Conv2dNumpy(1, 6, kernel_size=5, stride=1, padding=0)
271 self.act1 = Activation()
272 self.pool1 = AvgPool2dNumpy(kernel_size=2, stride=2)
273
274 # Layer 2: Conv + Pool
275 self.conv2 = Conv2dNumpy(6, 16, kernel_size=5, stride=1, padding=0)
276 self.act2 = Activation()
277 self.pool2 = AvgPool2dNumpy(kernel_size=2, stride=2)
278
279 # Layer 3: Conv (โ 1x1)
280 self.conv3 = Conv2dNumpy(16, 120, kernel_size=5, stride=1, padding=0)
281 self.act3 = Activation()
282
283 # Flatten
284 self.flatten = FlattenNumpy()
285
286 # FC Layers
287 self.fc1 = LinearNumpy(120, 84)
288 self.act4 = Activation()
289 self.fc2 = LinearNumpy(84, num_classes)
290
291 # Loss
292 self.criterion = SoftmaxCrossEntropyNumpy()
293
294 # Layer ๋ฆฌ์คํธ (update์ฉ)
295 self.layers = [
296 self.conv1, self.conv2, self.conv3,
297 self.fc1, self.fc2
298 ]
299
300 def forward(self, x: np.ndarray) -> np.ndarray:
301 """Forward pass"""
302 # Layer 1
303 x = self.conv1.forward(x) # (N, 6, 28, 28)
304 x = self.act1.forward(x)
305 x = self.pool1.forward(x) # (N, 6, 14, 14)
306
307 # Layer 2
308 x = self.conv2.forward(x) # (N, 16, 10, 10)
309 x = self.act2.forward(x)
310 x = self.pool2.forward(x) # (N, 16, 5, 5)
311
312 # Layer 3
313 x = self.conv3.forward(x) # (N, 120, 1, 1)
314 x = self.act3.forward(x)
315
316 # Flatten + FC
317 x = self.flatten.forward(x) # (N, 120)
318 x = self.fc1.forward(x) # (N, 84)
319 x = self.act4.forward(x)
320 x = self.fc2.forward(x) # (N, 10)
321
322 return x
323
324 def backward(self, grad: np.ndarray) -> np.ndarray:
325 """Backward pass"""
326 # FC layers (์ญ์)
327 grad = self.fc2.backward(grad)
328 grad = self.act4.backward(grad)
329 grad = self.fc1.backward(grad)
330
331 # Unflatten
332 grad = self.flatten.backward(grad)
333
334 # Conv layers (์ญ์)
335 grad = self.act3.backward(grad)
336 grad = self.conv3.backward(grad)
337
338 grad = self.pool2.backward(grad)
339 grad = self.act2.backward(grad)
340 grad = self.conv2.backward(grad)
341
342 grad = self.pool1.backward(grad)
343 grad = self.act1.backward(grad)
344 grad = self.conv1.backward(grad)
345
346 return grad
347
348 def train_step(
349 self,
350 images: np.ndarray,
351 labels: np.ndarray,
352 lr: float = 0.01
353 ) -> Tuple[float, float]:
354 """
355 ๋จ์ผ ํ์ต ์คํ
356
357 Returns:
358 (loss, accuracy)
359 """
360 # Forward
361 logits = self.forward(images)
362
363 # Loss
364 loss = self.criterion.forward(logits, labels)
365
366 # Accuracy
367 predictions = np.argmax(logits, axis=1)
368 accuracy = np.mean(predictions == labels)
369
370 # Backward
371 grad = self.criterion.backward()
372 self.backward(grad)
373
374 # Update
375 for layer in self.layers:
376 layer.update(lr)
377
378 return loss, accuracy
379
380 def predict(self, images: np.ndarray) -> np.ndarray:
381 """์์ธก"""
382 logits = self.forward(images)
383 return np.argmax(logits, axis=1)
384
385
386def load_mnist_subset() -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
387 """
388 MNIST ๋ฐ์ดํฐ์
๋ก๋ (๊ฐ๋จํ ๋ฒ์ )
389
390 ์ค์ ๋ก๋ torchvision์ด๋ keras๋ฅผ ์ฌ์ฉ
391 ์ฌ๊ธฐ์๋ ์์์ฉ์ผ๋ก ๋๋ค ๋ฐ์ดํฐ ์์ฑ
392 """
393 print("Note: ์ค์ MNIST ๋์ ๋๋ค ๋ฐ์ดํฐ ์ฌ์ฉ")
394
395 # ํ์ต ๋ฐ์ดํฐ
396 X_train = np.random.randn(1000, 1, 32, 32).astype(np.float32)
397 y_train = np.random.randint(0, 10, 1000)
398
399 # ํ
์คํธ ๋ฐ์ดํฐ
400 X_test = np.random.randn(200, 1, 32, 32).astype(np.float32)
401 y_test = np.random.randint(0, 10, 200)
402
403 return X_train, y_train, X_test, y_test
404
405
406def train_lenet():
407 """LeNet-5 ํ์ต"""
408 print("=== LeNet-5 NumPy Training ===\n")
409
410 # ๋ฐ์ดํฐ
411 X_train, y_train, X_test, y_test = load_mnist_subset()
412 print(f"Train: {X_train.shape}, Test: {X_test.shape}")
413
414 # ๋ชจ๋ธ
415 model = LeNet5Numpy(num_classes=10, use_relu=True)
416
417 # ํ์ดํผํ๋ผ๋ฏธํฐ
418 epochs = 5
419 batch_size = 32
420 lr = 0.01
421 num_batches = len(X_train) // batch_size
422
423 # ํ์ต
424 for epoch in range(epochs):
425 # Shuffle
426 indices = np.random.permutation(len(X_train))
427 X_train = X_train[indices]
428 y_train = y_train[indices]
429
430 epoch_loss = 0.0
431 epoch_acc = 0.0
432
433 for batch_idx in range(num_batches):
434 start = batch_idx * batch_size
435 end = start + batch_size
436
437 images = X_train[start:end]
438 labels = y_train[start:end]
439
440 loss, acc = model.train_step(images, labels, lr)
441
442 epoch_loss += loss
443 epoch_acc += acc
444
445 if (batch_idx + 1) % 10 == 0:
446 print(f" Batch {batch_idx+1}/{num_batches}, "
447 f"Loss: {loss:.4f}, Acc: {acc:.4f}")
448
449 avg_loss = epoch_loss / num_batches
450 avg_acc = epoch_acc / num_batches
451
452 print(f"\nEpoch {epoch+1}/{epochs}")
453 print(f" Train Loss: {avg_loss:.4f}, Acc: {avg_acc:.4f}")
454
455 # ๊ฒ์ฆ
456 predictions = model.predict(X_test)
457 test_acc = np.mean(predictions == y_test)
458 print(f" Test Acc: {test_acc:.4f}")
459 print()
460
461 print("Training complete!")
462
463
464if __name__ == "__main__":
465 train_lenet()