lenet_numpy.py

Download
python 466 lines 12.4 KB
  1"""
  2NumPy๋กœ ๊ตฌํ˜„ํ•œ LeNet-5
  3
  4์›๋ณธ ๋…ผ๋ฌธ: LeCun et al. (1998)
  5"Gradient-Based Learning Applied to Document Recognition"
  6"""
  7
  8import numpy as np
  9from typing import Tuple, List
 10from conv_numpy import Conv2dNumpy, im2col, col2im
 11
 12
 13class AvgPool2dNumpy:
 14    """Average Pooling Layer"""
 15
 16    def __init__(self, kernel_size: int = 2, stride: int = 2):
 17        self.kernel_size = kernel_size
 18        self.stride = stride
 19        self.cache = {}
 20
 21    def forward(self, input: np.ndarray) -> np.ndarray:
 22        """Forward pass"""
 23        N, C, H, W = input.shape
 24        K = self.kernel_size
 25        S = self.stride
 26
 27        H_out = (H - K) // S + 1
 28        W_out = (W - K) // S + 1
 29
 30        # im2col ๋ณ€ํ™˜
 31        col = im2col(input, (K, K), S, padding=0)
 32        col = col.reshape(N, C, K * K, H_out * W_out)
 33
 34        # ํ‰๊ท 
 35        output = np.mean(col, axis=2)
 36        output = output.reshape(N, C, H_out, W_out)
 37
 38        # ์บ์‹œ
 39        self.cache['input_shape'] = input.shape
 40
 41        return output
 42
 43    def backward(self, grad_output: np.ndarray) -> np.ndarray:
 44        """Backward pass"""
 45        N, C, H_out, W_out = grad_output.shape
 46        input_shape = self.cache['input_shape']
 47        K = self.kernel_size
 48
 49        # ๊ฐ ์›์†Œ์— 1/(K*K) ๋งŒํผ ๋ถ„๋ฐฐ
 50        grad_output_expanded = grad_output.reshape(N, C, 1, H_out * W_out)
 51        grad_col = np.repeat(grad_output_expanded, K * K, axis=2) / (K * K)
 52        grad_col = grad_col.reshape(N, C * K * K, H_out * W_out)
 53
 54        grad_input = col2im(
 55            grad_col, input_shape, (K, K),
 56            self.stride, padding=0
 57        )
 58
 59        return grad_input
 60
 61
 62class MaxPool2dNumpy:
 63    """Max Pooling Layer"""
 64
 65    def __init__(self, kernel_size: int = 2, stride: int = 2):
 66        self.kernel_size = kernel_size
 67        self.stride = stride
 68        self.cache = {}
 69
 70    def forward(self, input: np.ndarray) -> np.ndarray:
 71        """Forward pass"""
 72        N, C, H, W = input.shape
 73        K = self.kernel_size
 74        S = self.stride
 75
 76        H_out = (H - K) // S + 1
 77        W_out = (W - K) // S + 1
 78
 79        # im2col
 80        col = im2col(input, (K, K), S, padding=0)
 81        col = col.reshape(N, C, K * K, H_out * W_out)
 82
 83        # Max
 84        max_idx = np.argmax(col, axis=2)
 85        output = np.max(col, axis=2)
 86        output = output.reshape(N, C, H_out, W_out)
 87
 88        # ์บ์‹œ
 89        self.cache['input_shape'] = input.shape
 90        self.cache['max_idx'] = max_idx
 91        self.cache['col_shape'] = (N, C, K * K, H_out * W_out)
 92
 93        return output
 94
 95    def backward(self, grad_output: np.ndarray) -> np.ndarray:
 96        """Backward pass"""
 97        N, C, H_out, W_out = grad_output.shape
 98        input_shape = self.cache['input_shape']
 99        max_idx = self.cache['max_idx']
100        K = self.kernel_size
101
102        # Max ์œ„์น˜์—๋งŒ gradient ์ „๋‹ฌ
103        grad_col = np.zeros((N, C, K * K, H_out * W_out))
104
105        for n in range(N):
106            for c in range(C):
107                for h in range(H_out):
108                    for w in range(W_out):
109                        idx = max_idx[n, c, h * W_out + w]
110                        grad_col[n, c, idx, h * W_out + w] = grad_output[n, c, h, w]
111
112        grad_col = grad_col.reshape(N, C * K * K, H_out * W_out)
113
114        grad_input = col2im(
115            grad_col, input_shape, (K, K),
116            self.stride, padding=0
117        )
118
119        return grad_input
120
121
122class FlattenNumpy:
123    """Flatten Layer"""
124
125    def __init__(self):
126        self.cache = {}
127
128    def forward(self, input: np.ndarray) -> np.ndarray:
129        self.cache['input_shape'] = input.shape
130        return input.reshape(input.shape[0], -1)
131
132    def backward(self, grad_output: np.ndarray) -> np.ndarray:
133        return grad_output.reshape(self.cache['input_shape'])
134
135
136class LinearNumpy:
137    """Fully Connected Layer"""
138
139    def __init__(self, in_features: int, out_features: int):
140        # Xavier ์ดˆ๊ธฐํ™”
141        scale = np.sqrt(2.0 / in_features)
142        self.weight = np.random.randn(out_features, in_features) * scale
143        self.bias = np.zeros(out_features)
144
145        self.weight_grad = None
146        self.bias_grad = None
147        self.cache = {}
148
149    def forward(self, input: np.ndarray) -> np.ndarray:
150        """Y = XW^T + b"""
151        self.cache['input'] = input
152        return input @ self.weight.T + self.bias
153
154    def backward(self, grad_output: np.ndarray) -> np.ndarray:
155        """Backward pass"""
156        input = self.cache['input']
157
158        # Gradients
159        self.weight_grad = grad_output.T @ input
160        self.bias_grad = np.sum(grad_output, axis=0)
161
162        # Input gradient
163        grad_input = grad_output @ self.weight
164
165        return grad_input
166
167    def update(self, lr: float):
168        self.weight -= lr * self.weight_grad
169        self.bias -= lr * self.bias_grad
170
171
172class TanhNumpy:
173    """Tanh Activation"""
174
175    def __init__(self):
176        self.cache = {}
177
178    def forward(self, input: np.ndarray) -> np.ndarray:
179        output = np.tanh(input)
180        self.cache['output'] = output
181        return output
182
183    def backward(self, grad_output: np.ndarray) -> np.ndarray:
184        output = self.cache['output']
185        return grad_output * (1 - output ** 2)
186
187
188class ReLUNumpy:
189    """ReLU Activation"""
190
191    def __init__(self):
192        self.cache = {}
193
194    def forward(self, input: np.ndarray) -> np.ndarray:
195        self.cache['input'] = input
196        return np.maximum(0, input)
197
198    def backward(self, grad_output: np.ndarray) -> np.ndarray:
199        input = self.cache['input']
200        return grad_output * (input > 0)
201
202
203class SoftmaxCrossEntropyNumpy:
204    """Softmax + Cross Entropy Loss"""
205
206    def __init__(self):
207        self.cache = {}
208
209    def forward(self, logits: np.ndarray, labels: np.ndarray) -> float:
210        """
211        Args:
212            logits: (N, num_classes)
213            labels: (N,) - ํด๋ž˜์Šค ์ธ๋ฑ์Šค
214
215        Returns:
216            loss: scalar
217        """
218        N = logits.shape[0]
219
220        # Softmax (์ˆ˜์น˜ ์•ˆ์ •์„ฑ)
221        shifted = logits - np.max(logits, axis=1, keepdims=True)
222        exp_scores = np.exp(shifted)
223        probs = exp_scores / np.sum(exp_scores, axis=1, keepdims=True)
224
225        # Cross entropy
226        correct_probs = probs[np.arange(N), labels]
227        loss = -np.mean(np.log(correct_probs + 1e-10))
228
229        # ์บ์‹œ
230        self.cache['probs'] = probs
231        self.cache['labels'] = labels
232
233        return loss
234
235    def backward(self) -> np.ndarray:
236        """Gradient: softmax(x) - one_hot(y)"""
237        probs = self.cache['probs']
238        labels = self.cache['labels']
239        N = probs.shape[0]
240
241        grad = probs.copy()
242        grad[np.arange(N), labels] -= 1
243        grad /= N
244
245        return grad
246
247
248class LeNet5Numpy:
249    """
250    LeNet-5 NumPy ๊ตฌํ˜„
251
252    ์•„ํ‚คํ…์ฒ˜:
253    Input (1, 32, 32)
254    โ†’ Conv1 (6, 5, 5) โ†’ Tanh โ†’ AvgPool
255    โ†’ Conv2 (16, 5, 5) โ†’ Tanh โ†’ AvgPool
256    โ†’ Conv3 (120, 5, 5) โ†’ Tanh
257    โ†’ FC1 (120 โ†’ 84) โ†’ Tanh
258    โ†’ FC2 (84 โ†’ 10)
259    """
260
261    def __init__(self, num_classes: int = 10, use_relu: bool = False):
262        """
263        Args:
264            num_classes: ์ถœ๋ ฅ ํด๋ž˜์Šค ์ˆ˜
265            use_relu: True๋ฉด ReLU, False๋ฉด Tanh (์›๋ณธ)
266        """
267        Activation = ReLUNumpy if use_relu else TanhNumpy
268
269        # Layer 1: Conv + Pool
270        self.conv1 = Conv2dNumpy(1, 6, kernel_size=5, stride=1, padding=0)
271        self.act1 = Activation()
272        self.pool1 = AvgPool2dNumpy(kernel_size=2, stride=2)
273
274        # Layer 2: Conv + Pool
275        self.conv2 = Conv2dNumpy(6, 16, kernel_size=5, stride=1, padding=0)
276        self.act2 = Activation()
277        self.pool2 = AvgPool2dNumpy(kernel_size=2, stride=2)
278
279        # Layer 3: Conv (โ†’ 1x1)
280        self.conv3 = Conv2dNumpy(16, 120, kernel_size=5, stride=1, padding=0)
281        self.act3 = Activation()
282
283        # Flatten
284        self.flatten = FlattenNumpy()
285
286        # FC Layers
287        self.fc1 = LinearNumpy(120, 84)
288        self.act4 = Activation()
289        self.fc2 = LinearNumpy(84, num_classes)
290
291        # Loss
292        self.criterion = SoftmaxCrossEntropyNumpy()
293
294        # Layer ๋ฆฌ์ŠคํŠธ (update์šฉ)
295        self.layers = [
296            self.conv1, self.conv2, self.conv3,
297            self.fc1, self.fc2
298        ]
299
300    def forward(self, x: np.ndarray) -> np.ndarray:
301        """Forward pass"""
302        # Layer 1
303        x = self.conv1.forward(x)     # (N, 6, 28, 28)
304        x = self.act1.forward(x)
305        x = self.pool1.forward(x)     # (N, 6, 14, 14)
306
307        # Layer 2
308        x = self.conv2.forward(x)     # (N, 16, 10, 10)
309        x = self.act2.forward(x)
310        x = self.pool2.forward(x)     # (N, 16, 5, 5)
311
312        # Layer 3
313        x = self.conv3.forward(x)     # (N, 120, 1, 1)
314        x = self.act3.forward(x)
315
316        # Flatten + FC
317        x = self.flatten.forward(x)   # (N, 120)
318        x = self.fc1.forward(x)       # (N, 84)
319        x = self.act4.forward(x)
320        x = self.fc2.forward(x)       # (N, 10)
321
322        return x
323
324    def backward(self, grad: np.ndarray) -> np.ndarray:
325        """Backward pass"""
326        # FC layers (์—ญ์ˆœ)
327        grad = self.fc2.backward(grad)
328        grad = self.act4.backward(grad)
329        grad = self.fc1.backward(grad)
330
331        # Unflatten
332        grad = self.flatten.backward(grad)
333
334        # Conv layers (์—ญ์ˆœ)
335        grad = self.act3.backward(grad)
336        grad = self.conv3.backward(grad)
337
338        grad = self.pool2.backward(grad)
339        grad = self.act2.backward(grad)
340        grad = self.conv2.backward(grad)
341
342        grad = self.pool1.backward(grad)
343        grad = self.act1.backward(grad)
344        grad = self.conv1.backward(grad)
345
346        return grad
347
348    def train_step(
349        self,
350        images: np.ndarray,
351        labels: np.ndarray,
352        lr: float = 0.01
353    ) -> Tuple[float, float]:
354        """
355        ๋‹จ์ผ ํ•™์Šต ์Šคํ…
356
357        Returns:
358            (loss, accuracy)
359        """
360        # Forward
361        logits = self.forward(images)
362
363        # Loss
364        loss = self.criterion.forward(logits, labels)
365
366        # Accuracy
367        predictions = np.argmax(logits, axis=1)
368        accuracy = np.mean(predictions == labels)
369
370        # Backward
371        grad = self.criterion.backward()
372        self.backward(grad)
373
374        # Update
375        for layer in self.layers:
376            layer.update(lr)
377
378        return loss, accuracy
379
380    def predict(self, images: np.ndarray) -> np.ndarray:
381        """์˜ˆ์ธก"""
382        logits = self.forward(images)
383        return np.argmax(logits, axis=1)
384
385
386def load_mnist_subset() -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
387    """
388    MNIST ๋ฐ์ดํ„ฐ์…‹ ๋กœ๋“œ (๊ฐ„๋‹จํ•œ ๋ฒ„์ „)
389
390    ์‹ค์ œ๋กœ๋Š” torchvision์ด๋‚˜ keras๋ฅผ ์‚ฌ์šฉ
391    ์—ฌ๊ธฐ์„œ๋Š” ์˜ˆ์‹œ์šฉ์œผ๋กœ ๋žœ๋ค ๋ฐ์ดํ„ฐ ์ƒ์„ฑ
392    """
393    print("Note: ์‹ค์ œ MNIST ๋Œ€์‹  ๋žœ๋ค ๋ฐ์ดํ„ฐ ์‚ฌ์šฉ")
394
395    # ํ•™์Šต ๋ฐ์ดํ„ฐ
396    X_train = np.random.randn(1000, 1, 32, 32).astype(np.float32)
397    y_train = np.random.randint(0, 10, 1000)
398
399    # ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ
400    X_test = np.random.randn(200, 1, 32, 32).astype(np.float32)
401    y_test = np.random.randint(0, 10, 200)
402
403    return X_train, y_train, X_test, y_test
404
405
406def train_lenet():
407    """LeNet-5 ํ•™์Šต"""
408    print("=== LeNet-5 NumPy Training ===\n")
409
410    # ๋ฐ์ดํ„ฐ
411    X_train, y_train, X_test, y_test = load_mnist_subset()
412    print(f"Train: {X_train.shape}, Test: {X_test.shape}")
413
414    # ๋ชจ๋ธ
415    model = LeNet5Numpy(num_classes=10, use_relu=True)
416
417    # ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐ
418    epochs = 5
419    batch_size = 32
420    lr = 0.01
421    num_batches = len(X_train) // batch_size
422
423    # ํ•™์Šต
424    for epoch in range(epochs):
425        # Shuffle
426        indices = np.random.permutation(len(X_train))
427        X_train = X_train[indices]
428        y_train = y_train[indices]
429
430        epoch_loss = 0.0
431        epoch_acc = 0.0
432
433        for batch_idx in range(num_batches):
434            start = batch_idx * batch_size
435            end = start + batch_size
436
437            images = X_train[start:end]
438            labels = y_train[start:end]
439
440            loss, acc = model.train_step(images, labels, lr)
441
442            epoch_loss += loss
443            epoch_acc += acc
444
445            if (batch_idx + 1) % 10 == 0:
446                print(f"  Batch {batch_idx+1}/{num_batches}, "
447                      f"Loss: {loss:.4f}, Acc: {acc:.4f}")
448
449        avg_loss = epoch_loss / num_batches
450        avg_acc = epoch_acc / num_batches
451
452        print(f"\nEpoch {epoch+1}/{epochs}")
453        print(f"  Train Loss: {avg_loss:.4f}, Acc: {avg_acc:.4f}")
454
455        # ๊ฒ€์ฆ
456        predictions = model.predict(X_test)
457        test_acc = np.mean(predictions == y_test)
458        print(f"  Test Acc: {test_acc:.4f}")
459        print()
460
461    print("Training complete!")
462
463
464if __name__ == "__main__":
465    train_lenet()