09. Edge AI - ONNX Runtime

09. Edge AI - ONNX Runtime

ν•™μŠ΅ λͺ©ν‘œ

  • ONNX(Open Neural Network Exchange) κ°œμš” 이해
  • ONNX Runtime μ„€μΉ˜ 및 μ‚¬μš©λ²• μŠ΅λ“
  • λͺ¨λΈ μ΅œμ ν™” 기법 ν•™μŠ΅
  • 라즈베리파이 배포
  • 객체 κ²€μΆœ 예제 κ΅¬ν˜„

1. ONNX κ°œμš”

1.1 ONNXλž€?

ONNX(Open Neural Network Exchange)λŠ” λ‹€μ–‘ν•œ ML ν”„λ ˆμž„μ›Œν¬ κ°„ λͺ¨λΈ ν˜Έν™˜μ„±μ„ μ œκ³΅ν•˜λŠ” μ˜€ν”ˆ ν¬λ§·μž…λ‹ˆλ‹€.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    ONNX μƒνƒœκ³„                               β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                              β”‚
β”‚   ν›ˆλ ¨ ν”„λ ˆμž„μ›Œν¬                         μΆ”λ‘  μ—”μ§„          β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                              β”‚
β”‚   β”‚ PyTorch  │────┐                                         β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚                                         β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚   β”‚TensorFlow│────┼──────▢│  ONNX    │───▢│ONNX Runtime  β”‚ β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚        β”‚ (.onnx)  β”‚    β”‚(ν¬λ‘œμŠ€ν”Œλž«νΌ)β”‚ β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚   β”‚  Keras   │─────                               β”‚         β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚                               β–Ό         β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚                        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚   β”‚ Sklearn  β”‚β”€β”€β”€β”€β”˜                        β”‚ 배포 λŒ€μƒ    β”‚ β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                             β”‚ β€’ λΌμ¦ˆλ² λ¦¬νŒŒμ΄β”‚ β”‚
β”‚                                            β”‚ β€’ Windows    β”‚ β”‚
β”‚                                            β”‚ β€’ Android    β”‚ β”‚
β”‚                                            β”‚ β€’ iOS        β”‚ β”‚
β”‚                                            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚                                                              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1.2 ONNX vs TFLite

νŠΉμ„± ONNX TFLite
κ°œλ°œμ‚¬ Microsoft + νŒŒνŠΈλ„ˆ Google
ν”„λ ˆμž„μ›Œν¬ 지원 PyTorch, TF, Sklearn λ“± TensorFlow/Keras
포맷 .onnx (Protobuf) .tflite (FlatBuffer)
μ΅œμ ν™” ONNX Runtime TF Lite Interpreter
μ–‘μžν™” 지원 지원
ν•˜λ“œμ›¨μ–΄ CPU, GPU, NPU CPU, GPU, Edge TPU

1.3 ONNX Runtime νŠΉμ§•

# ONNX Runtime νŠΉμ§•
onnx_runtime_features = {
    "ν¬λ‘œμŠ€ν”Œλž«νΌ": "Windows, Linux, macOS, Android, iOS",
    "ν•˜λ“œμ›¨μ–΄ 가속": "CPU, CUDA, TensorRT, DirectML, OpenVINO",
    "닀쀑 μ–Έμ–΄": "Python, C++, C#, Java, JavaScript",
    "μ΅œμ ν™”": "κ·Έλž˜ν”„ μ΅œμ ν™”, μ–‘μžν™”, μ—°μ‚°μž 퓨전",
    "μœ μ—°μ„±": "λ‹€μ–‘ν•œ ν”„λ ˆμž„μ›Œν¬μ—μ„œ λ³€ν™˜λœ λͺ¨λΈ μ‹€ν–‰"
}

2. ONNX Runtime μ„€μΉ˜

2.1 라즈베리파이 μ„€μΉ˜

# κΈ°λ³Έ ONNX Runtime (CPU)
pip install onnxruntime

# ARM64 μ΅œμ ν™” 버전 (라즈베리파이 OS 64bit)
# pip install onnxruntime --extra-index-url https://aiinfra.pkgs.visualstudio.com/...

# μΆ”κ°€ νŒ¨ν‚€μ§€
pip install numpy pillow onnx

# λͺ¨λΈ λ³€ν™˜μš© (PCμ—μ„œ)
pip install tf2onnx torch onnx-simplifier

2.2 μ„€μΉ˜ 확인

#!/usr/bin/env python3
"""ONNX Runtime μ„€μΉ˜ 확인"""

import onnxruntime as ort
import numpy as np

# 버전 확인
print(f"ONNX Runtime 버전: {ort.__version__}")

# μ‚¬μš© κ°€λŠ₯ν•œ ν”„λ‘œλ°”μ΄λ” (μ‹€ν–‰ λ°±μ—”λ“œ)
providers = ort.get_available_providers()
print(f"μ‚¬μš© κ°€λŠ₯ν•œ ν”„λ‘œλ°”μ΄λ”: {providers}")

# κ°„λ‹¨ν•œ ν…ŒμŠ€νŠΈ
# 더미 λͺ¨λΈ μ‹€ν–‰
session_options = ort.SessionOptions()
session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL

print("\nONNX Runtime 정상 λ™μž‘ 확인!")

3. λͺ¨λΈ λ³€ν™˜

3.1 PyTorch to ONNX

#!/usr/bin/env python3
"""PyTorch λͺ¨λΈμ„ ONNX둜 λ³€ν™˜"""

import torch
import torch.nn as nn

# μ˜ˆμ‹œ λͺ¨λΈ
class SimpleNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(10, 50)
        self.fc2 = nn.Linear(50, 3)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        return self.fc2(x)

def export_to_onnx(model, output_path: str, input_shape: tuple):
    """PyTorch λͺ¨λΈμ„ ONNX둜 내보내기"""
    model.eval()

    # 더미 μž…λ ₯
    dummy_input = torch.randn(*input_shape)

    # ONNX 내보내기
    torch.onnx.export(
        model,
        dummy_input,
        output_path,
        input_names=['input'],
        output_names=['output'],
        dynamic_axes={
            'input': {0: 'batch_size'},
            'output': {0: 'batch_size'}
        },
        opset_version=13
    )

    print(f"ONNX λͺ¨λΈ μ €μž₯: {output_path}")

# μ‚¬μš© 예
if __name__ == "__main__":
    model = SimpleNet()
    export_to_onnx(model, "simple_net.onnx", (1, 10))

3.2 TensorFlow/Keras to ONNX

# tf2onnx μ‚¬μš© (μ»€λ§¨λ“œλΌμΈ)
python -m tf2onnx.convert \
    --saved-model tensorflow_model/ \
    --output model.onnx \
    --opset 13
#!/usr/bin/env python3
"""TensorFlow/Keras λͺ¨λΈμ„ ONNX둜 λ³€ν™˜"""

import tensorflow as tf
import tf2onnx
import onnx

def keras_to_onnx(model_path: str, output_path: str):
    """Keras λͺ¨λΈμ„ ONNX둜 λ³€ν™˜"""
    # Keras λͺ¨λΈ λ‘œλ“œ
    model = tf.keras.models.load_model(model_path)

    # ONNX둜 λ³€ν™˜
    onnx_model, _ = tf2onnx.convert.from_keras(
        model,
        opset=13,
        output_path=output_path
    )

    print(f"λ³€ν™˜ μ™„λ£Œ: {output_path}")

# μ‚¬μš© 예
keras_to_onnx("my_model.h5", "my_model.onnx")

3.3 λͺ¨λΈ 검증 및 λ‹¨μˆœν™”

#!/usr/bin/env python3
"""ONNX λͺ¨λΈ 검증 및 λ‹¨μˆœν™”"""

import onnx
from onnxsim import simplify

def validate_and_simplify(model_path: str, output_path: str = None):
    """ONNX λͺ¨λΈ 검증 및 μ΅œμ ν™”"""
    # λͺ¨λΈ λ‘œλ“œ
    model = onnx.load(model_path)

    # 검증
    try:
        onnx.checker.check_model(model)
        print("λͺ¨λΈ 검증 톡과")
    except Exception as e:
        print(f"검증 μ‹€νŒ¨: {e}")
        return

    # λͺ¨λΈ 정보 좜λ ₯
    print(f"\nλͺ¨λΈ 정보:")
    print(f"  IR 버전: {model.ir_version}")
    print(f"  Opset: {model.opset_import[0].version}")
    print(f"  κ·Έλž˜ν”„ 이름: {model.graph.name}")

    # μž…μΆœλ ₯ 정보
    print(f"\nμž…λ ₯:")
    for input in model.graph.input:
        print(f"  {input.name}: {input.type}")

    print(f"\n좜λ ₯:")
    for output in model.graph.output:
        print(f"  {output.name}: {output.type}")

    # λ‹¨μˆœν™” (쀑볡 μ—°μ‚° 제거, κ·Έλž˜ν”„ μ΅œμ ν™”)
    simplified_model, check = simplify(model)

    if check:
        print("\nλ‹¨μˆœν™” 성곡")

        if output_path:
            onnx.save(simplified_model, output_path)
            print(f"μ €μž₯: {output_path}")

            # 크기 비ꡐ
            import os
            orig_size = os.path.getsize(model_path) / 1024
            new_size = os.path.getsize(output_path) / 1024
            print(f"\n크기: {orig_size:.1f}KB -> {new_size:.1f}KB")

        return simplified_model
    else:
        print("λ‹¨μˆœν™” μ‹€νŒ¨")
        return model

# μ‚¬μš© 예
if __name__ == "__main__":
    validate_and_simplify("model.onnx", "model_simplified.onnx")

4. μΆ”λ‘  μˆ˜ν–‰

4.1 κΈ°λ³Έ μΆ”λ‘ 

#!/usr/bin/env python3
"""ONNX Runtime κΈ°λ³Έ μΆ”λ‘ """

import onnxruntime as ort
import numpy as np

class ONNXModel:
    """ONNX λͺ¨λΈ 래퍼"""

    def __init__(self, model_path: str, providers: list = None):
        if providers is None:
            providers = ['CPUExecutionProvider']

        # μ„Έμ…˜ μ˜΅μ…˜
        sess_options = ort.SessionOptions()
        sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
        sess_options.intra_op_num_threads = 4

        # μ„Έμ…˜ 생성
        self.session = ort.InferenceSession(
            model_path,
            sess_options=sess_options,
            providers=providers
        )

        # μž…μΆœλ ₯ 정보
        self.input_name = self.session.get_inputs()[0].name
        self.input_shape = self.session.get_inputs()[0].shape
        self.output_name = self.session.get_outputs()[0].name

    def get_input_shape(self):
        return self.input_shape

    def predict(self, input_data: np.ndarray) -> np.ndarray:
        """μΆ”λ‘  μˆ˜ν–‰"""
        outputs = self.session.run(
            [self.output_name],
            {self.input_name: input_data}
        )
        return outputs[0]

# μ‚¬μš© 예
if __name__ == "__main__":
    model = ONNXModel("model.onnx")

    print(f"μž…λ ₯ ν˜•νƒœ: {model.get_input_shape()}")

    # 더미 μž…λ ₯
    input_data = np.random.randn(1, 10).astype(np.float32)
    output = model.predict(input_data)

    print(f"좜λ ₯ ν˜•νƒœ: {output.shape}")
    print(f"좜λ ₯ κ°’: {output}")

4.2 배치 μΆ”λ‘ 

#!/usr/bin/env python3
"""ONNX Runtime 배치 μΆ”λ‘ """

import onnxruntime as ort
import numpy as np
import time

def batch_inference(model_path: str, data: np.ndarray,
                    batch_size: int = 32) -> np.ndarray:
    """배치 μΆ”λ‘  μˆ˜ν–‰"""
    session = ort.InferenceSession(model_path, providers=['CPUExecutionProvider'])

    input_name = session.get_inputs()[0].name
    output_name = session.get_outputs()[0].name

    results = []
    num_samples = len(data)

    for i in range(0, num_samples, batch_size):
        batch = data[i:i + batch_size]
        output = session.run([output_name], {input_name: batch})[0]
        results.append(output)

    return np.concatenate(results, axis=0)

# μ„±λŠ₯ μΈ‘μ •
def benchmark_batch_sizes(model_path: str, input_shape: tuple):
    """배치 크기별 μ„±λŠ₯ 비ꡐ"""
    session = ort.InferenceSession(model_path, providers=['CPUExecutionProvider'])

    input_name = session.get_inputs()[0].name

    total_samples = 1000

    for batch_size in [1, 8, 16, 32, 64]:
        data = np.random.randn(total_samples, *input_shape[1:]).astype(np.float32)

        start = time.perf_counter()

        for i in range(0, total_samples, batch_size):
            batch = data[i:i + batch_size]
            _ = session.run(None, {input_name: batch})

        elapsed = time.perf_counter() - start
        throughput = total_samples / elapsed

        print(f"배치 크기 {batch_size:2d}: {throughput:.1f} samples/sec")

4.3 μ–‘μžν™” μΆ”λ‘ 

#!/usr/bin/env python3
"""ONNX Runtime μ–‘μžν™”"""

import onnxruntime as ort
from onnxruntime.quantization import quantize_dynamic, QuantType

def quantize_model(model_path: str, output_path: str):
    """동적 μ–‘μžν™” 적용"""
    quantize_dynamic(
        model_input=model_path,
        model_output=output_path,
        weight_type=QuantType.QInt8
    )

    import os
    orig_size = os.path.getsize(model_path) / (1024 * 1024)
    new_size = os.path.getsize(output_path) / (1024 * 1024)

    print(f"원본: {orig_size:.2f} MB")
    print(f"μ–‘μžν™”: {new_size:.2f} MB")
    print(f"μ••μΆ•λ₯ : {orig_size / new_size:.1f}x")

# μ‚¬μš© 예
quantize_model("model.onnx", "model_quantized.onnx")

5. 객체 κ²€μΆœ 예제

5.1 YOLO ONNX λͺ¨λΈ μ‚¬μš©

#!/usr/bin/env python3
"""YOLOv5 ONNX 객체 κ²€μΆœ"""

import onnxruntime as ort
import numpy as np
from PIL import Image
import cv2

class YOLODetector:
    """YOLOv5 ONNX 객체 κ²€μΆœκΈ°"""

    # COCO 클래슀
    CLASSES = [
        'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train',
        'truck', 'boat', 'traffic light', 'fire hydrant', 'stop sign',
        'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep',
        'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella',
        'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard',
        'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard',
        'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup', 'fork',
        'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange',
        'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair',
        'couch', 'potted plant', 'bed', 'dining table', 'toilet', 'tv',
        'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave',
        'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase',
        'scissors', 'teddy bear', 'hair drier', 'toothbrush'
    ]

    def __init__(self, model_path: str, conf_threshold: float = 0.5,
                 iou_threshold: float = 0.45):
        self.session = ort.InferenceSession(
            model_path,
            providers=['CPUExecutionProvider']
        )

        self.conf_threshold = conf_threshold
        self.iou_threshold = iou_threshold

        # μž…λ ₯ 정보
        input_info = self.session.get_inputs()[0]
        self.input_name = input_info.name
        self.input_shape = input_info.shape
        self.input_height = self.input_shape[2]
        self.input_width = self.input_shape[3]

    def preprocess(self, image: np.ndarray) -> tuple:
        """이미지 μ „μ²˜λ¦¬"""
        orig_height, orig_width = image.shape[:2]

        # λ¦¬μ‚¬μ΄μ¦ˆ
        resized = cv2.resize(image, (self.input_width, self.input_height))

        # BGR to RGB, HWC to CHW
        input_data = resized[:, :, ::-1].transpose(2, 0, 1)

        # μ •κ·œν™” (0-1)
        input_data = input_data.astype(np.float32) / 255.0

        # 배치 차원 μΆ”κ°€
        input_data = np.expand_dims(input_data, axis=0)

        # μŠ€μΌ€μΌ λΉ„μœ¨ μ €μž₯
        scale = (orig_width / self.input_width, orig_height / self.input_height)

        return input_data, scale

    def postprocess(self, output: np.ndarray, scale: tuple) -> list:
        """좜λ ₯ ν›„μ²˜λ¦¬"""
        predictions = output[0]

        boxes = []
        scores = []
        class_ids = []

        for pred in predictions:
            confidence = pred[4]

            if confidence > self.conf_threshold:
                class_probs = pred[5:]
                class_id = np.argmax(class_probs)
                class_score = class_probs[class_id]

                if class_score > self.conf_threshold:
                    # λ°•μŠ€ μ’Œν‘œ (center_x, center_y, width, height)
                    cx, cy, w, h = pred[:4]

                    # 원본 μŠ€μΌ€μΌλ‘œ λ³€ν™˜
                    x1 = int((cx - w / 2) * scale[0])
                    y1 = int((cy - h / 2) * scale[1])
                    x2 = int((cx + w / 2) * scale[0])
                    y2 = int((cy + h / 2) * scale[1])

                    boxes.append([x1, y1, x2, y2])
                    scores.append(float(confidence * class_score))
                    class_ids.append(int(class_id))

        # NMS (Non-Maximum Suppression)
        if boxes:
            indices = cv2.dnn.NMSBoxes(
                boxes, scores, self.conf_threshold, self.iou_threshold
            )

            results = []
            for i in indices:
                idx = i[0] if isinstance(i, (list, np.ndarray)) else i
                results.append({
                    'box': boxes[idx],
                    'score': scores[idx],
                    'class_id': class_ids[idx],
                    'class_name': self.CLASSES[class_ids[idx]]
                })

            return results

        return []

    def detect(self, image: np.ndarray) -> list:
        """객체 κ²€μΆœ"""
        input_data, scale = self.preprocess(image)

        outputs = self.session.run(None, {self.input_name: input_data})

        detections = self.postprocess(outputs[0], scale)

        return detections

    def draw_detections(self, image: np.ndarray, detections: list) -> np.ndarray:
        """κ²€μΆœ κ²°κ³Ό μ‹œκ°ν™”"""
        result = image.copy()

        for det in detections:
            x1, y1, x2, y2 = det['box']
            label = f"{det['class_name']}: {det['score']:.2f}"

            # λ°•μŠ€
            cv2.rectangle(result, (x1, y1), (x2, y2), (0, 255, 0), 2)

            # 라벨
            (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
            cv2.rectangle(result, (x1, y1 - 20), (x1 + w, y1), (0, 255, 0), -1)
            cv2.putText(result, label, (x1, y1 - 5),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)

        return result

# μ‚¬μš© 예
if __name__ == "__main__":
    detector = YOLODetector("yolov5s.onnx")

    # 이미지 λ‘œλ“œ
    image = cv2.imread("test_image.jpg")

    # κ²€μΆœ
    detections = detector.detect(image)

    print(f"κ²€μΆœλœ 객체: {len(detections)}개")
    for det in detections:
        print(f"  {det['class_name']}: {det['score']:.2f}")

    # κ²°κ³Ό μ €μž₯
    result_image = detector.draw_detections(image, detections)
    cv2.imwrite("result.jpg", result_image)

5.2 μ‹€μ‹œκ°„ 객체 κ²€μΆœ

#!/usr/bin/env python3
"""μ‹€μ‹œκ°„ 객체 κ²€μΆœ (Pi Camera + ONNX)"""

import numpy as np
import cv2
import time

try:
    from picamera2 import Picamera2
    HAS_CAMERA = True
except ImportError:
    HAS_CAMERA = False

# YOLODetector ν΄λž˜μŠ€λŠ” μœ„μ™€ 동일

class RealtimeDetector:
    """μ‹€μ‹œκ°„ 객체 κ²€μΆœκΈ°"""

    def __init__(self, model_path: str):
        self.detector = YOLODetector(model_path)

        if HAS_CAMERA:
            self.camera = Picamera2()
            config = self.camera.create_preview_configuration(
                main={"size": (640, 480), "format": "RGB888"}
            )
            self.camera.configure(config)

    def run(self, duration: float = 60, display: bool = False):
        """μ‹€μ‹œκ°„ κ²€μΆœ μ‹€ν–‰"""
        if not HAS_CAMERA:
            print("카메라 μ—†μŒ")
            return

        self.camera.start()
        print(f"μ‹€μ‹œκ°„ κ²€μΆœ μ‹œμž‘ ({duration}초)")

        start_time = time.time()
        frame_count = 0
        fps_time = time.time()

        try:
            while time.time() - start_time < duration:
                # ν”„λ ˆμž„ 캑처
                frame = self.camera.capture_array()

                # BGR λ³€ν™˜ (OpenCV ν˜•μ‹)
                frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

                # κ²€μΆœ
                detections = self.detector.detect(frame_bgr)

                frame_count += 1

                # FPS 계산
                if frame_count % 10 == 0:
                    elapsed = time.time() - fps_time
                    fps = 10 / elapsed
                    fps_time = time.time()

                    print(f"\rFPS: {fps:.1f}, κ²€μΆœ: {len(detections)}개", end="")

                    for det in detections:
                        print(f" | {det['class_name']}", end="")

                # λ””μŠ€ν”Œλ ˆμ΄ (선택)
                if display:
                    result = self.detector.draw_detections(frame_bgr, detections)
                    cv2.imshow("Detection", result)
                    if cv2.waitKey(1) & 0xFF == ord('q'):
                        break

        except KeyboardInterrupt:
            pass
        finally:
            self.camera.stop()
            if display:
                cv2.destroyAllWindows()

            total_time = time.time() - start_time
            avg_fps = frame_count / total_time
            print(f"\n\n평균 FPS: {avg_fps:.1f}")

if __name__ == "__main__":
    detector = RealtimeDetector("yolov5s.onnx")
    detector.run(duration=30, display=False)

5.3 κ²€μΆœ κ²°κ³Ό MQTT λ°œν–‰

#!/usr/bin/env python3
"""객체 κ²€μΆœ κ²°κ³Ό MQTT λ°œν–‰"""

import paho.mqtt.client as mqtt
import json
import time

class DetectionPublisher:
    """κ²€μΆœ κ²°κ³Ό MQTT λ°œν–‰κΈ°"""

    def __init__(self, model_path: str, mqtt_broker: str = "localhost"):
        self.detector = YOLODetector(model_path)

        self.mqtt_client = mqtt.Client()
        self.mqtt_client.connect(mqtt_broker, 1883)
        self.mqtt_client.loop_start()

        self.node_id = "detector_01"

    def process_and_publish(self, image_path: str):
        """이미지 처리 및 κ²°κ³Ό λ°œν–‰"""
        import cv2

        image = cv2.imread(image_path)
        if image is None:
            print(f"이미지 λ‘œλ“œ μ‹€νŒ¨: {image_path}")
            return

        # κ²€μΆœ
        start = time.perf_counter()
        detections = self.detector.detect(image)
        inference_time = (time.perf_counter() - start) * 1000

        # κ²°κ³Ό 생성
        result = {
            "node_id": self.node_id,
            "image": image_path,
            "detections": [
                {
                    "class": det['class_name'],
                    "score": round(det['score'], 3),
                    "box": det['box']
                }
                for det in detections
            ],
            "count": len(detections),
            "inference_time_ms": round(inference_time, 2),
            "timestamp": time.time()
        }

        # MQTT λ°œν–‰
        topic = f"edge/{self.node_id}/detection"
        self.mqtt_client.publish(topic, json.dumps(result))

        print(f"λ°œν–‰: {topic}")
        print(f"  κ²€μΆœ: {len(detections)}개, μ‹œκ°„: {inference_time:.1f}ms")

        return result

    def shutdown(self):
        self.mqtt_client.loop_stop()
        self.mqtt_client.disconnect()

if __name__ == "__main__":
    publisher = DetectionPublisher("yolov5s.onnx")

    try:
        publisher.process_and_publish("test_image.jpg")
    finally:
        publisher.shutdown()

μ—°μŠ΅ 문제

문제 1: λͺ¨λΈ λ³€ν™˜

  1. PyTorch 이미지 λΆ„λ₯˜ λͺ¨λΈμ„ ONNX둜 λ³€ν™˜ν•˜μ„Έμš”.
  2. λ³€ν™˜λœ λͺ¨λΈμ„ κ²€μ¦ν•˜κ³  λ‹¨μˆœν™”ν•˜μ„Έμš”.

문제 2: μ„±λŠ₯ 비ꡐ

  1. TFLite와 ONNX Runtime의 μΆ”λ‘  속도λ₯Ό λΉ„κ΅ν•˜μ„Έμš”.
  2. 배치 크기별 μ²˜λ¦¬λŸ‰μ„ μΈ‘μ •ν•˜μ„Έμš”.

문제 3: μ‹€μ‹œκ°„ κ²€μΆœ

  1. YOLO λͺ¨λΈλ‘œ μ‹€μ‹œκ°„ 객체 κ²€μΆœμ„ κ΅¬ν˜„ν•˜μ„Έμš”.
  2. κ²€μΆœ κ²°κ³Όλ₯Ό MQTT둜 λ°œν–‰ν•˜μ„Έμš”.

λ‹€μŒ 단계


μ΅œμ’… μ—…λ°μ΄νŠΈ: 2026-02-01

to navigate between lessons