12. 클라우드 IoT 통합

12. 클라우드 IoT 통합

학습 목표

  • AWS IoT Core 개요 및 설정
  • GCP IoT (Pub/Sub) 연동
  • MQTT 브릿지 구현
  • 디바이스 등록 및 인증
  • 클라우드 데이터 수집 및 분석

1. AWS IoT Core 개요

1.1 AWS IoT Core 아키텍처

┌─────────────────────────────────────────────────────────────┐
│                    AWS IoT Core 아키텍처                     │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│   IoT 디바이스                    AWS Cloud                  │
│   ┌─────────┐                                                │
│   │라즈베리 │     MQTT/TLS      ┌──────────────────────┐    │
│   │  파이   │───────────────────▶│    AWS IoT Core     │    │
│   └─────────┘                    │                      │    │
│                                  │  • Message Broker    │    │
│   ┌─────────┐                    │  • Rules Engine      │    │
│   │  ESP32  │───────────────────▶│  • Device Shadow     │    │
│   └─────────┘                    │  • Registry          │    │
│                                  └──────────┬───────────┘    │
│                                             │                │
│        ┌────────────────┬──────────────────┼────────────┐   │
│        │                │                  │            │   │
│        ▼                ▼                  ▼            ▼   │
│   ┌─────────┐    ┌──────────┐     ┌──────────┐  ┌────────┐ │
│   │   S3    │    │ DynamoDB │     │  Lambda  │  │Kinesis │ │
│   │ (저장)  │    │  (DB)    │     │ (처리)   │  │(스트림)│ │
│   └─────────┘    └──────────┘     └──────────┘  └────────┘ │
│                                                              │
└─────────────────────────────────────────────────────────────┘

1.2 핵심 구성 요소

구성 요소 설명
Device Gateway MQTT/HTTPS/WebSocket 연결 관리
Message Broker Pub/Sub 메시지 라우팅
Rules Engine 메시지 필터링 및 AWS 서비스 연동
Device Shadow 디바이스 상태의 가상 복제본
Registry 디바이스 ID 및 인증 관리

1.3 AWS CLI 설정

# AWS CLI 설치
pip install awscli

# 자격 증명 설정
aws configure
# AWS Access Key ID: your-access-key
# AWS Secret Access Key: your-secret-key
# Default region name: ap-northeast-2
# Default output format: json

# IoT 엔드포인트 확인
aws iot describe-endpoint --endpoint-type iot:Data-ATS

2. AWS IoT 디바이스 등록

2.1 인증서 생성

# 디바이스 디렉토리 생성
mkdir -p ~/iot-certs && cd ~/iot-certs

# 인증서 및 키 생성
aws iot create-keys-and-certificate \
    --set-as-active \
    --certificate-pem-outfile device.cert.pem \
    --public-key-outfile device.public.key \
    --private-key-outfile device.private.key

# 루트 CA 다운로드
wget https://www.amazontrust.com/repository/AmazonRootCA1.pem -O root-CA.crt

2.2 정책 생성 및 연결

# IoT 정책 생성
cat > iot-policy.json << 'EOF'
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "iot:Connect",
                "iot:Publish",
                "iot:Subscribe",
                "iot:Receive"
            ],
            "Resource": "*"
        }
    ]
}
EOF

aws iot create-policy \
    --policy-name IoTDevicePolicy \
    --policy-document file://iot-policy.json

# 인증서에 정책 연결
aws iot attach-policy \
    --policy-name IoTDevicePolicy \
    --target <인증서-ARN>

# Thing 생성
aws iot create-thing --thing-name RaspberryPi-001

# Thing에 인증서 연결
aws iot attach-thing-principal \
    --thing-name RaspberryPi-001 \
    --principal <인증서-ARN>

2.3 Python 연결 클라이언트

#!/usr/bin/env python3
"""AWS IoT Core 연결"""

from awscrt import mqtt
from awsiot import mqtt_connection_builder
import json
import time

class AWSIoTClient:
    """AWS IoT Core 클라이언트"""

    def __init__(self, endpoint: str, cert_path: str, key_path: str,
                 ca_path: str, client_id: str):
        self.endpoint = endpoint
        self.client_id = client_id

        # MQTT 연결 생성
        self.connection = mqtt_connection_builder.mtls_from_path(
            endpoint=endpoint,
            port=8883,
            cert_filepath=cert_path,
            pri_key_filepath=key_path,
            ca_filepath=ca_path,
            client_id=client_id,
            clean_session=False,
            keep_alive_secs=30
        )

        self.connected = False

    def connect(self):
        """연결"""
        print(f"AWS IoT 연결 중: {self.endpoint}")

        connect_future = self.connection.connect()
        connect_future.result()

        self.connected = True
        print("연결 성공!")

    def disconnect(self):
        """연결 해제"""
        if self.connected:
            disconnect_future = self.connection.disconnect()
            disconnect_future.result()
            print("연결 해제됨")

    def publish(self, topic: str, payload: dict, qos: mqtt.QoS = mqtt.QoS.AT_LEAST_ONCE):
        """메시지 발행"""
        message = json.dumps(payload)

        publish_future, _ = self.connection.publish(
            topic=topic,
            payload=message,
            qos=qos
        )
        publish_future.result()

        print(f"발행: {topic} = {message}")

    def subscribe(self, topic: str, callback):
        """토픽 구독"""
        def on_message(topic, payload, **kwargs):
            message = json.loads(payload)
            callback(topic, message)

        subscribe_future, _ = self.connection.subscribe(
            topic=topic,
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=on_message
        )
        subscribe_future.result()

        print(f"구독: {topic}")

# 사용 예
if __name__ == "__main__":
    # 인증서 경로
    ENDPOINT = "your-endpoint.iot.ap-northeast-2.amazonaws.com"
    CERT_PATH = "~/iot-certs/device.cert.pem"
    KEY_PATH = "~/iot-certs/device.private.key"
    CA_PATH = "~/iot-certs/root-CA.crt"

    client = AWSIoTClient(
        endpoint=ENDPOINT,
        cert_path=CERT_PATH,
        key_path=KEY_PATH,
        ca_path=CA_PATH,
        client_id="RaspberryPi-001"
    )

    try:
        client.connect()

        # 센서 데이터 발행
        for i in range(10):
            data = {
                "device_id": "RaspberryPi-001",
                "temperature": 25.5 + i * 0.1,
                "humidity": 60 + i,
                "timestamp": int(time.time())
            }
            client.publish("sensor/data", data)
            time.sleep(5)

    finally:
        client.disconnect()

3. GCP IoT (Pub/Sub)

중요 안내: GCP IoT Core 서비스는 2023년 8월 16일에 종료되었습니다.

Google은 IoT Core를 대체하여 Cloud Pub/Sub + Cloud Functions 조합을 권장합니다. 기존 IoT Core 사용자는 다음 대안을 고려하세요: - Cloud Pub/Sub: 메시지 브로커 (아래 예제) - Clearblade IoT Core: Google 파트너사의 IoT Core 호환 서비스 - AWS IoT Core: AWS 대안 - Azure IoT Hub: Microsoft 대안

이 레슨에서는 Cloud Pub/Sub를 직접 사용하는 방법을 다룹니다.

3.1 GCP Pub/Sub 아키텍처

┌─────────────────────────────────────────────────────────────┐
│                    GCP Pub/Sub 아키텍처                      │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│   Publisher                                 Subscriber       │
│   (IoT 디바이스)                            (처리 서비스)    │
│   ┌─────────┐        ┌──────────────┐      ┌─────────┐      │
│   │라즈베리 │──────▶│    Topic     │─────▶│ Cloud   │      │
│   │  파이   │        │  (sensor)    │      │ Function│      │
│   └─────────┘        └──────────────┘      └─────────┘      │
│                             │                               │
│                             ▼                               │
│                      ┌──────────────┐                       │
│                      │ Subscription │                       │
│                      │    (pull)    │                       │
│                      └──────────────┘                       │
│                             │                               │
│                             ▼                               │
│                      ┌──────────────┐                       │
│                      │   BigQuery   │                       │
│                      │   (분석)     │                       │
│                      └──────────────┘                       │
│                                                              │
└─────────────────────────────────────────────────────────────┘

3.2 GCP 설정

# gcloud CLI 설치
# https://cloud.google.com/sdk/docs/install

# 인증
gcloud auth login
gcloud config set project YOUR_PROJECT_ID

# Pub/Sub API 활성화
gcloud services enable pubsub.googleapis.com

# 토픽 생성
gcloud pubsub topics create iot-sensor-data

# 구독 생성
gcloud pubsub subscriptions create iot-sensor-sub \
    --topic=iot-sensor-data

# 서비스 계정 생성
gcloud iam service-accounts create iot-device \
    --display-name="IoT Device Service Account"

# 권한 부여
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID \
    --member="serviceAccount:iot-device@YOUR_PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/pubsub.publisher"

# 키 생성
gcloud iam service-accounts keys create ~/gcp-credentials.json \
    --iam-account=iot-device@YOUR_PROJECT_ID.iam.gserviceaccount.com

3.3 Python Pub/Sub 클라이언트

#!/usr/bin/env python3
"""GCP Pub/Sub 클라이언트"""

from google.cloud import pubsub_v1
import json
import time
import os

# 인증 설정
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'gcp-credentials.json'

class GCPPubSubClient:
    """GCP Pub/Sub 클라이언트"""

    def __init__(self, project_id: str, topic_id: str):
        self.project_id = project_id
        self.topic_id = topic_id
        self.topic_path = f"projects/{project_id}/topics/{topic_id}"

        self.publisher = pubsub_v1.PublisherClient()

    def publish(self, data: dict, **attributes):
        """메시지 발행"""
        message = json.dumps(data).encode('utf-8')

        future = self.publisher.publish(
            self.topic_path,
            message,
            **attributes
        )

        message_id = future.result()
        print(f"발행됨: {message_id}")
        return message_id

    def publish_batch(self, messages: list):
        """배치 발행"""
        futures = []

        for data in messages:
            message = json.dumps(data).encode('utf-8')
            future = self.publisher.publish(self.topic_path, message)
            futures.append(future)

        # 모든 발행 완료 대기
        results = [f.result() for f in futures]
        print(f"배치 발행 완료: {len(results)}개")
        return results

class GCPPubSubSubscriber:
    """GCP Pub/Sub 구독자"""

    def __init__(self, project_id: str, subscription_id: str):
        self.project_id = project_id
        self.subscription_id = subscription_id
        self.subscription_path = f"projects/{project_id}/subscriptions/{subscription_id}"

        self.subscriber = pubsub_v1.SubscriberClient()

    def pull(self, max_messages: int = 10) -> list:
        """메시지 풀"""
        response = self.subscriber.pull(
            subscription=self.subscription_path,
            max_messages=max_messages
        )

        messages = []
        ack_ids = []

        for msg in response.received_messages:
            data = json.loads(msg.message.data.decode('utf-8'))
            messages.append({
                'data': data,
                'attributes': dict(msg.message.attributes),
                'message_id': msg.message.message_id
            })
            ack_ids.append(msg.ack_id)

        # ACK 전송
        if ack_ids:
            self.subscriber.acknowledge(
                subscription=self.subscription_path,
                ack_ids=ack_ids
            )

        return messages

    def subscribe(self, callback):
        """스트리밍 구독"""
        def on_message(message):
            data = json.loads(message.data.decode('utf-8'))
            callback(data, message.attributes)
            message.ack()

        streaming_pull = self.subscriber.subscribe(
            self.subscription_path,
            callback=on_message
        )

        print(f"구독 시작: {self.subscription_path}")
        return streaming_pull

# 사용 예
if __name__ == "__main__":
    PROJECT_ID = "your-project-id"

    # Publisher
    publisher = GCPPubSubClient(PROJECT_ID, "iot-sensor-data")

    for i in range(5):
        data = {
            "device_id": "pi-001",
            "temperature": 25.5 + i * 0.5,
            "timestamp": int(time.time())
        }
        publisher.publish(data, device_type="raspberry_pi")
        time.sleep(1)

4. MQTT 브릿지

4.1 로컬-클라우드 브릿지

#!/usr/bin/env python3
"""MQTT 브릿지: 로컬 -> 클라우드"""

import paho.mqtt.client as mqtt
from google.cloud import pubsub_v1
import json
import threading
import time
import os

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'gcp-credentials.json'

class MQTTBridge:
    """로컬 MQTT를 클라우드로 브릿지"""

    def __init__(self, local_broker: str, gcp_project: str, gcp_topic: str):
        # 로컬 MQTT 클라이언트
        self.local_client = mqtt.Client(client_id="mqtt_bridge")
        self.local_client.on_connect = self._on_local_connect
        self.local_client.on_message = self._on_local_message

        self.local_broker = local_broker

        # GCP Pub/Sub
        self.publisher = pubsub_v1.PublisherClient()
        self.topic_path = f"projects/{gcp_project}/topics/{gcp_topic}"

        # 브릿지 설정
        self.topic_mapping = {
            "sensor/#": True,
            "device/+/status": True
        }

        self.stats = {
            "messages_received": 0,
            "messages_forwarded": 0,
            "errors": 0
        }

    def _on_local_connect(self, client, userdata, flags, rc):
        if rc == 0:
            print("로컬 브로커 연결됨")
            # 토픽 구독
            for topic in self.topic_mapping:
                client.subscribe(topic)
                print(f"구독: {topic}")

    def _on_local_message(self, client, userdata, msg):
        """로컬 메시지 수신 -> 클라우드로 전달"""
        self.stats["messages_received"] += 1

        try:
            # 메시지 파싱
            payload = msg.payload.decode('utf-8')

            try:
                data = json.loads(payload)
            except json.JSONDecodeError:
                data = {"raw": payload}

            # 메타데이터 추가
            cloud_message = {
                "source_topic": msg.topic,
                "data": data,
                "timestamp": time.time()
            }

            # 클라우드로 발행
            self._forward_to_cloud(cloud_message, msg.topic)
            self.stats["messages_forwarded"] += 1

        except Exception as e:
            print(f"브릿지 오류: {e}")
            self.stats["errors"] += 1

    def _forward_to_cloud(self, message: dict, source_topic: str):
        """클라우드로 메시지 전달"""
        data = json.dumps(message).encode('utf-8')

        future = self.publisher.publish(
            self.topic_path,
            data,
            source_topic=source_topic
        )
        future.result()  # 발행 완료 대기

    def start(self):
        """브릿지 시작"""
        print(f"MQTT 브릿지 시작: {self.local_broker} -> GCP")

        self.local_client.connect(self.local_broker, 1883)
        self.local_client.loop_start()

    def stop(self):
        """브릿지 중지"""
        self.local_client.loop_stop()
        self.local_client.disconnect()

        print(f"\n브릿지 통계:")
        print(f"  수신: {self.stats['messages_received']}")
        print(f"  전달: {self.stats['messages_forwarded']}")
        print(f"  오류: {self.stats['errors']}")

    def run(self):
        """메인 루프"""
        self.start()

        try:
            while True:
                time.sleep(10)
                print(f"브릿지 상태: 수신={self.stats['messages_received']}, "
                      f"전달={self.stats['messages_forwarded']}")
        except KeyboardInterrupt:
            pass
        finally:
            self.stop()

if __name__ == "__main__":
    bridge = MQTTBridge(
        local_broker="localhost",
        gcp_project="your-project-id",
        gcp_topic="iot-sensor-data"
    )
    bridge.run()

4.2 양방향 브릿지

#!/usr/bin/env python3
"""양방향 MQTT-클라우드 브릿지"""

import paho.mqtt.client as mqtt
from google.cloud import pubsub_v1
import json
import threading
import time

class BidirectionalBridge:
    """양방향 브릿지"""

    def __init__(self, local_broker: str, gcp_project: str,
                 up_topic: str, down_topic: str, down_sub: str):
        # 로컬 MQTT
        self.local_client = mqtt.Client()
        self.local_broker = local_broker

        # GCP Pub/Sub
        self.gcp_project = gcp_project
        self.publisher = pubsub_v1.PublisherClient()
        self.subscriber = pubsub_v1.SubscriberClient()

        self.up_topic_path = f"projects/{gcp_project}/topics/{up_topic}"
        self.down_sub_path = f"projects/{gcp_project}/subscriptions/{down_sub}"

    def _on_local_message(self, client, userdata, msg):
        """로컬 -> 클라우드"""
        try:
            payload = json.loads(msg.payload.decode())
            data = json.dumps({
                "topic": msg.topic,
                "payload": payload
            }).encode()

            self.publisher.publish(self.up_topic_path, data)
            print(f"[UP] {msg.topic}")

        except Exception as e:
            print(f"업스트림 오류: {e}")

    def _on_cloud_message(self, message):
        """클라우드 -> 로컬"""
        try:
            data = json.loads(message.data.decode())
            topic = data.get("topic", "command")
            payload = data.get("payload", {})

            self.local_client.publish(topic, json.dumps(payload))
            print(f"[DOWN] {topic}")

            message.ack()

        except Exception as e:
            print(f"다운스트림 오류: {e}")
            message.nack()

    def start(self):
        """브릿지 시작"""
        # 로컬 MQTT 시작
        self.local_client.on_message = self._on_local_message
        self.local_client.connect(self.local_broker, 1883)
        self.local_client.subscribe("sensor/#")
        self.local_client.loop_start()

        # 클라우드 구독 시작
        self.streaming_pull = self.subscriber.subscribe(
            self.down_sub_path,
            callback=self._on_cloud_message
        )

        print("양방향 브릿지 시작")

    def stop(self):
        """브릿지 중지"""
        self.streaming_pull.cancel()
        self.local_client.loop_stop()
        self.local_client.disconnect()

    def run(self):
        self.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            self.stop()

5. 데이터 수집 및 분석

5.1 시계열 데이터 저장

#!/usr/bin/env python3
"""IoT 데이터 저장 및 분석"""

from datetime import datetime, timedelta
import json
from typing import List, Dict
from dataclasses import dataclass

# InfluxDB 사용 (시계열 DB)
# pip install influxdb-client

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

@dataclass
class SensorData:
    device_id: str
    temperature: float
    humidity: float
    timestamp: datetime

class TimeSeriesDB:
    """시계열 데이터베이스 클라이언트"""

    def __init__(self, url: str, token: str, org: str, bucket: str):
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.query_api = self.client.query_api()
        self.bucket = bucket
        self.org = org

    def write_sensor_data(self, data: SensorData):
        """센서 데이터 저장"""
        point = Point("sensor_reading") \
            .tag("device_id", data.device_id) \
            .field("temperature", data.temperature) \
            .field("humidity", data.humidity) \
            .time(data.timestamp)

        self.write_api.write(bucket=self.bucket, record=point)

    def write_batch(self, data_list: List[SensorData]):
        """배치 저장"""
        points = []
        for data in data_list:
            point = Point("sensor_reading") \
                .tag("device_id", data.device_id) \
                .field("temperature", data.temperature) \
                .field("humidity", data.humidity) \
                .time(data.timestamp)
            points.append(point)

        self.write_api.write(bucket=self.bucket, record=points)

    def query_recent(self, device_id: str, hours: int = 24) -> List[Dict]:
        """최근 데이터 조회"""
        query = f'''
        from(bucket: "{self.bucket}")
            |> range(start: -{hours}h)
            |> filter(fn: (r) => r["device_id"] == "{device_id}")
            |> filter(fn: (r) => r["_measurement"] == "sensor_reading")
        '''

        tables = self.query_api.query(query, org=self.org)

        results = []
        for table in tables:
            for record in table.records:
                results.append({
                    "time": record.get_time(),
                    "field": record.get_field(),
                    "value": record.get_value()
                })

        return results

    def get_statistics(self, device_id: str, hours: int = 24) -> Dict:
        """통계 조회"""
        query = f'''
        from(bucket: "{self.bucket}")
            |> range(start: -{hours}h)
            |> filter(fn: (r) => r["device_id"] == "{device_id}")
            |> filter(fn: (r) => r["_field"] == "temperature")
            |> mean()
        '''

        tables = self.query_api.query(query, org=self.org)

        stats = {}
        for table in tables:
            for record in table.records:
                stats["mean_temperature"] = record.get_value()

        return stats

    def close(self):
        self.client.close()

5.2 대시보드 데이터 API

#!/usr/bin/env python3
"""IoT 대시보드 API"""

from flask import Flask, jsonify, request
from flask_cors import CORS
from datetime import datetime, timedelta
import json

app = Flask(__name__)
CORS(app)

# 데이터 스토어 (실제로는 DB 연결)
data_store = {
    "devices": {},
    "readings": []
}

@app.route('/api/devices')
def list_devices():
    """디바이스 목록"""
    return jsonify({
        "devices": list(data_store["devices"].values()),
        "count": len(data_store["devices"])
    })

@app.route('/api/devices/<device_id>/readings')
def get_readings(device_id):
    """디바이스 데이터 조회"""
    hours = request.args.get('hours', 24, type=int)
    limit = request.args.get('limit', 100, type=int)

    cutoff = datetime.now() - timedelta(hours=hours)

    readings = [
        r for r in data_store["readings"]
        if r["device_id"] == device_id and
           datetime.fromisoformat(r["timestamp"]) > cutoff
    ]

    readings = sorted(readings, key=lambda x: x["timestamp"], reverse=True)[:limit]

    return jsonify({
        "device_id": device_id,
        "readings": readings,
        "count": len(readings)
    })

@app.route('/api/devices/<device_id>/stats')
def get_stats(device_id):
    """디바이스 통계"""
    hours = request.args.get('hours', 24, type=int)
    cutoff = datetime.now() - timedelta(hours=hours)

    readings = [
        r for r in data_store["readings"]
        if r["device_id"] == device_id and
           datetime.fromisoformat(r["timestamp"]) > cutoff
    ]

    if not readings:
        return jsonify({"error": "No data"}), 404

    temps = [r["temperature"] for r in readings if "temperature" in r]
    humids = [r["humidity"] for r in readings if "humidity" in r]

    stats = {
        "device_id": device_id,
        "period_hours": hours,
        "reading_count": len(readings)
    }

    if temps:
        stats["temperature"] = {
            "min": min(temps),
            "max": max(temps),
            "avg": sum(temps) / len(temps)
        }

    if humids:
        stats["humidity"] = {
            "min": min(humids),
            "max": max(humids),
            "avg": sum(humids) / len(humids)
        }

    return jsonify(stats)

@app.route('/api/alerts')
def get_alerts():
    """알림 조회"""
    alerts = []

    # 임계값 체크
    for device_id, device in data_store["devices"].items():
        last_reading = device.get("last_reading")
        if last_reading:
            if last_reading.get("temperature", 0) > 35:
                alerts.append({
                    "device_id": device_id,
                    "type": "high_temperature",
                    "value": last_reading["temperature"],
                    "threshold": 35
                })

    return jsonify({"alerts": alerts, "count": len(alerts)})

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=5000)

6. 종합 IoT 파이프라인

#!/usr/bin/env python3
"""종합 IoT 데이터 파이프라인"""

import paho.mqtt.client as mqtt
from google.cloud import pubsub_v1
import json
import time
import threading
from datetime import datetime
from dataclasses import dataclass
from typing import Callable, List

@dataclass
class IoTMessage:
    device_id: str
    data: dict
    timestamp: datetime
    source: str

class IoTPipeline:
    """IoT 데이터 파이프라인"""

    def __init__(self, config: dict):
        self.config = config
        self.handlers: List[Callable] = []

        # 로컬 MQTT
        self.mqtt_client = mqtt.Client()
        self.mqtt_client.on_message = self._on_mqtt_message

        # 클라우드 (옵션)
        if config.get("gcp_enabled"):
            self.publisher = pubsub_v1.PublisherClient()

    def add_handler(self, handler: Callable):
        """메시지 핸들러 추가"""
        self.handlers.append(handler)

    def _on_mqtt_message(self, client, userdata, msg):
        """MQTT 메시지 처리"""
        try:
            payload = json.loads(msg.payload.decode())

            message = IoTMessage(
                device_id=payload.get("device_id", "unknown"),
                data=payload,
                timestamp=datetime.now(),
                source=msg.topic
            )

            # 핸들러 실행
            for handler in self.handlers:
                try:
                    handler(message)
                except Exception as e:
                    print(f"핸들러 오류: {e}")

        except Exception as e:
            print(f"메시지 처리 오류: {e}")

    def start(self):
        """파이프라인 시작"""
        broker = self.config.get("mqtt_broker", "localhost")
        topics = self.config.get("mqtt_topics", ["sensor/#"])

        self.mqtt_client.connect(broker, 1883)

        for topic in topics:
            self.mqtt_client.subscribe(topic)
            print(f"구독: {topic}")

        self.mqtt_client.loop_start()
        print("IoT 파이프라인 시작됨")

    def stop(self):
        """파이프라인 중지"""
        self.mqtt_client.loop_stop()
        self.mqtt_client.disconnect()

# 핸들러 예시
def log_handler(message: IoTMessage):
    """로깅 핸들러"""
    print(f"[{message.timestamp}] {message.device_id}: {message.data}")

def alert_handler(message: IoTMessage):
    """알림 핸들러"""
    temp = message.data.get("temperature")
    if temp and temp > 35:
        print(f"[ALERT] 고온 감지: {message.device_id} = {temp}°C")

def cloud_handler(message: IoTMessage, publisher, topic_path):
    """클라우드 전송 핸들러"""
    data = json.dumps({
        "device_id": message.device_id,
        "data": message.data,
        "timestamp": message.timestamp.isoformat()
    }).encode()

    publisher.publish(topic_path, data)

# 사용 예
if __name__ == "__main__":
    config = {
        "mqtt_broker": "localhost",
        "mqtt_topics": ["sensor/#", "device/+/status"],
        "gcp_enabled": False
    }

    pipeline = IoTPipeline(config)
    pipeline.add_handler(log_handler)
    pipeline.add_handler(alert_handler)

    pipeline.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        pipeline.stop()

연습 문제

문제 1: AWS IoT 연동

  1. AWS IoT Core에 디바이스를 등록하세요.
  2. 센서 데이터를 AWS IoT로 발행하세요.

문제 2: 데이터 분석

  1. 수집된 센서 데이터의 일일 통계를 계산하세요.
  2. 이상치 감지 알림을 구현하세요.

문제 3: 대시보드

  1. 클라우드 데이터를 시각화하는 대시보드를 만드세요.
  2. 실시간 데이터 업데이트를 구현하세요.

마무리

이 레슨에서는 IoT 디바이스를 클라우드 서비스와 연동하는 방법을 학습했습니다. AWS IoT Core와 GCP Pub/Sub을 활용하여 대규모 IoT 시스템을 구축할 수 있습니다.

전체 학습 요약

레슨 주제 핵심 기술
01 IoT 개요 아키텍처, 프로토콜
02 라즈베리파이 설정, SSH, GPIO
03 GPIO 제어 gpiozero, 센서
04 WiFi 소켓, HTTP
05 BLE bleak, GATT
06 MQTT paho-mqtt, Mosquitto
07 REST API Flask, JSON
08-09 Edge AI TFLite, ONNX
10 홈 자동화 릴레이, 웹 대시보드
11 영상 분석 picamera2, 모션 감지
12 클라우드 IoT AWS, GCP, 브릿지

최종 업데이트: 2026-02-01

to navigate between lessons