07. HTTP/REST for IoT
07. HTTP/REST for IoT¶
νμ΅ λͺ©ν¶
- Flaskλ₯Ό μ΄μ©ν IoT μλ² κ΅¬μΆ
- μΌμ λ°μ΄ν° μμ§ API μ€κ³
- RESTful API μ€κ³ μμΉ μ΄ν΄
- JSON λ°μ΄ν° μ²λ¦¬ λ° κ²μ¦
1. Flask IoT μλ²¶
1.1 κΈ°λ³Έ μ€μ ¶
# ν¨ν€μ§ μ€μΉ
pip install flask flask-cors
# μΆκ° μ νΈλ¦¬ν°
pip install python-dotenv
#!/usr/bin/env python3
"""Flask IoT μλ² κΈ°λ³Έ ꡬ쑰"""
from flask import Flask, jsonify, request
from flask_cors import CORS
from datetime import datetime
app = Flask(__name__)
CORS(app) # CORS νμ±ν
# λ©λͺ¨λ¦¬ μ μ₯μ (μ€μ νλ‘μ νΈμμλ DB μ¬μ©)
sensor_data_store = []
devices = {}
@app.route('/')
def index():
"""API μ 보"""
return jsonify({
"name": "IoT API Server",
"version": "1.0",
"endpoints": {
"/api/sensors": "GET, POST",
"/api/sensors/<id>": "GET",
"/api/devices": "GET, POST",
"/api/devices/<id>": "GET, PUT, DELETE"
}
})
@app.route('/health')
def health():
"""ν¬μ€ 체ν¬"""
return jsonify({
"status": "healthy",
"timestamp": datetime.now().isoformat()
})
if __name__ == "__main__":
app.run(host='0.0.0.0', port=5000, debug=True)
1.2 νλ‘μ νΈ κ΅¬μ‘°¶
iot_server/
βββ app.py # λ©μΈ μ ν리μΌμ΄μ
βββ config.py # μ€μ
βββ requirements.txt # μμ‘΄μ±
βββ routes/ # λΌμ°νΈ λͺ¨λ
β βββ __init__.py
β βββ sensors.py
β βββ devices.py
βββ models/ # λ°μ΄ν° λͺ¨λΈ
β βββ __init__.py
β βββ sensor.py
βββ utils/ # μ νΈλ¦¬ν°
βββ __init__.py
βββ validators.py
1.3 μ€μ κ΄λ¦¬¶
# config.py
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
"""μ ν리μΌμ΄μ
μ€μ """
SECRET_KEY = os.getenv('SECRET_KEY', 'dev-secret-key')
DEBUG = os.getenv('DEBUG', 'True').lower() == 'true'
HOST = os.getenv('HOST', '0.0.0.0')
PORT = int(os.getenv('PORT', 5000))
# λ°μ΄ν°λ² μ΄μ€ (SQLite μμ)
SQLALCHEMY_DATABASE_URI = os.getenv(
'DATABASE_URL',
'sqlite:///iot_data.db'
)
# MQTT μ€μ
MQTT_BROKER = os.getenv('MQTT_BROKER', 'localhost')
MQTT_PORT = int(os.getenv('MQTT_PORT', 1883))
2. μΌμ λ°μ΄ν° API¶
2.1 μΌμ λ°μ΄ν° CRUD¶
#!/usr/bin/env python3
"""μΌμ λ°μ΄ν° API"""
from flask import Blueprint, jsonify, request
from datetime import datetime
import uuid
sensors_bp = Blueprint('sensors', __name__)
# λ©λͺ¨λ¦¬ μ μ₯μ
sensor_readings = []
sensors_registry = {}
# === μΌμ λ±λ‘ ===
@sensors_bp.route('/sensors', methods=['GET'])
def list_sensors():
"""λ±λ‘λ μΌμ λͺ©λ‘"""
return jsonify({
"sensors": list(sensors_registry.values()),
"count": len(sensors_registry)
})
@sensors_bp.route('/sensors', methods=['POST'])
def register_sensor():
"""μ μΌμ λ±λ‘"""
data = request.get_json()
if not data or 'name' not in data:
return jsonify({"error": "name is required"}), 400
sensor_id = str(uuid.uuid4())[:8]
sensor = {
"id": sensor_id,
"name": data['name'],
"type": data.get('type', 'generic'),
"location": data.get('location', 'unknown'),
"registered_at": datetime.now().isoformat(),
"status": "active"
}
sensors_registry[sensor_id] = sensor
return jsonify(sensor), 201
@sensors_bp.route('/sensors/<sensor_id>', methods=['GET'])
def get_sensor(sensor_id):
"""μΌμ μ 보 μ‘°ν"""
sensor = sensors_registry.get(sensor_id)
if not sensor:
return jsonify({"error": "Sensor not found"}), 404
return jsonify(sensor)
# === μΌμ λ°μ΄ν° ===
@sensors_bp.route('/sensors/<sensor_id>/data', methods=['POST'])
def post_sensor_data(sensor_id):
"""μΌμ λ°μ΄ν° μμ """
if sensor_id not in sensors_registry:
# μλ λ±λ‘ (μ΅μ
)
sensors_registry[sensor_id] = {
"id": sensor_id,
"name": f"auto_{sensor_id}",
"registered_at": datetime.now().isoformat(),
"status": "active"
}
data = request.get_json()
if not data:
return jsonify({"error": "No data provided"}), 400
reading = {
"id": str(uuid.uuid4()),
"sensor_id": sensor_id,
"data": data,
"timestamp": datetime.now().isoformat()
}
sensor_readings.append(reading)
# μ΅κ·Ό 1000κ°λ§ μ μ§
if len(sensor_readings) > 1000:
sensor_readings.pop(0)
return jsonify({"status": "ok", "reading_id": reading["id"]}), 201
@sensors_bp.route('/sensors/<sensor_id>/data', methods=['GET'])
def get_sensor_data(sensor_id):
"""μΌμ λ°μ΄ν° μ‘°ν"""
# 쿼리 νλΌλ―Έν°
limit = request.args.get('limit', 100, type=int)
since = request.args.get('since', None) # ISO timestamp
# νν°λ§
readings = [r for r in sensor_readings if r['sensor_id'] == sensor_id]
if since:
readings = [r for r in readings if r['timestamp'] > since]
# μ΅μ μ μ λ ¬ λ° μ ν
readings = sorted(readings, key=lambda x: x['timestamp'], reverse=True)[:limit]
return jsonify({
"sensor_id": sensor_id,
"readings": readings,
"count": len(readings)
})
@sensors_bp.route('/sensors/<sensor_id>/latest', methods=['GET'])
def get_latest_reading(sensor_id):
"""μ΅μ μΌμ λ°μ΄ν°"""
readings = [r for r in sensor_readings if r['sensor_id'] == sensor_id]
if not readings:
return jsonify({"error": "No data found"}), 404
latest = max(readings, key=lambda x: x['timestamp'])
return jsonify(latest)
# λΈλ£¨νλ¦°νΈ λ±λ‘
# app.pyμμ: app.register_blueprint(sensors_bp, url_prefix='/api')
2.2 μ§κ³ API¶
@sensors_bp.route('/sensors/<sensor_id>/stats', methods=['GET'])
def get_sensor_stats(sensor_id):
"""μΌμ λ°μ΄ν° ν΅κ³"""
readings = [r for r in sensor_readings if r['sensor_id'] == sensor_id]
if not readings:
return jsonify({"error": "No data found"}), 404
# μ«μ λ°μ΄ν° μΆμΆ (μ: temperature)
field = request.args.get('field', 'temperature')
values = []
for r in readings:
if field in r.get('data', {}):
try:
values.append(float(r['data'][field]))
except (ValueError, TypeError):
pass
if not values:
return jsonify({"error": f"No numeric data for field: {field}"}), 404
stats = {
"sensor_id": sensor_id,
"field": field,
"count": len(values),
"min": min(values),
"max": max(values),
"avg": sum(values) / len(values),
"latest": values[-1] if values else None
}
return jsonify(stats)
3. REST API μ€κ³¶
3.1 RESTful μμΉ¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β RESTful API μμΉ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β 1. 리μμ€ μ€μ¬ μ€κ³ β
β β’ URLμ λͺ
μ¬ μ¬μ©: /sensors, /devices β
β β’ λμ¬λ HTTP λ©μλλ‘ νν β
β β
β 2. HTTP λ©μλ β
β β’ GET: μ‘°ν (λ©±λ±, μμ ) β
β β’ POST: μμ± β
β β’ PUT: μ 체 μμ (λ©±λ±) β
β β’ PATCH: λΆλΆ μμ β
β β’ DELETE: μμ (λ©±λ±) β
β β
β 3. μν μ½λ β
β β’ 200: μ±κ³΅ β
β β’ 201: μμ±λ¨ β
β β’ 204: λ΄μ© μμ (μμ ) β
β β’ 400: μλͺ»λ μμ² β
β β’ 401: μΈμ¦ νμ β
β β’ 404: 리μμ€ μμ β
β β’ 500: μλ² μ€λ₯ β
β β
β 4. λ²μ λ β
β β’ URL: /api/v1/sensors β
β β’ ν€λ: Accept: application/vnd.api.v1+json β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
3.2 IoT API μ€κ³ μμ¶
# routes/devices.py
"""μ₯μΉ κ΄λ¦¬ API"""
from flask import Blueprint, jsonify, request
from datetime import datetime
devices_bp = Blueprint('devices', __name__)
# μ μ₯μ
devices = {}
@devices_bp.route('/devices', methods=['GET'])
def list_devices():
"""μ₯μΉ λͺ©λ‘ μ‘°ν"""
# νν°λ§
device_type = request.args.get('type')
status = request.args.get('status')
result = list(devices.values())
if device_type:
result = [d for d in result if d.get('type') == device_type]
if status:
result = [d for d in result if d.get('status') == status]
return jsonify({
"devices": result,
"total": len(result)
})
@devices_bp.route('/devices', methods=['POST'])
def create_device():
"""μ₯μΉ λ±λ‘"""
data = request.get_json()
required_fields = ['id', 'name', 'type']
for field in required_fields:
if field not in data:
return jsonify({"error": f"Missing field: {field}"}), 400
device_id = data['id']
if device_id in devices:
return jsonify({"error": "Device already exists"}), 409
device = {
**data,
"status": "offline",
"created_at": datetime.now().isoformat(),
"last_seen": None
}
devices[device_id] = device
return jsonify(device), 201
@devices_bp.route('/devices/<device_id>', methods=['GET'])
def get_device(device_id):
"""μ₯μΉ μ 보 μ‘°ν"""
device = devices.get(device_id)
if not device:
return jsonify({"error": "Device not found"}), 404
return jsonify(device)
@devices_bp.route('/devices/<device_id>', methods=['PUT'])
def update_device(device_id):
"""μ₯μΉ μ 보 μ 체 μμ """
if device_id not in devices:
return jsonify({"error": "Device not found"}), 404
data = request.get_json()
data['id'] = device_id # ID μ μ§
devices[device_id] = {
**data,
"updated_at": datetime.now().isoformat()
}
return jsonify(devices[device_id])
@devices_bp.route('/devices/<device_id>', methods=['PATCH'])
def patch_device(device_id):
"""μ₯μΉ μ 보 λΆλΆ μμ """
if device_id not in devices:
return jsonify({"error": "Device not found"}), 404
data = request.get_json()
# μΌλΆ νλλ§ μ
λ°μ΄νΈ
devices[device_id].update(data)
devices[device_id]['updated_at'] = datetime.now().isoformat()
return jsonify(devices[device_id])
@devices_bp.route('/devices/<device_id>', methods=['DELETE'])
def delete_device(device_id):
"""μ₯μΉ μμ """
if device_id not in devices:
return jsonify({"error": "Device not found"}), 404
del devices[device_id]
return '', 204
# === μ₯μΉ μ μ΄ ===
@devices_bp.route('/devices/<device_id>/commands', methods=['POST'])
def send_command(device_id):
"""μ₯μΉμ λͺ
λ Ή μ μ‘"""
if device_id not in devices:
return jsonify({"error": "Device not found"}), 404
data = request.get_json()
if 'command' not in data:
return jsonify({"error": "Command required"}), 400
# λͺ
λ Ή μ²λ¦¬ (μ€μ λ‘λ MQTT λ°ν λ±)
command = {
"device_id": device_id,
"command": data['command'],
"params": data.get('params', {}),
"sent_at": datetime.now().isoformat()
}
# μ¬κΈ°μ MQTT λ°ν λλ μ§μ μ μ΄
print(f"Command sent: {command}")
return jsonify({
"status": "sent",
"command": command
}), 202
3.3 νμ΄μ§λ€μ΄μ ¶
@sensors_bp.route('/readings', methods=['GET'])
def list_all_readings():
"""λͺ¨λ μΌμ λ°μ΄ν° (νμ΄μ§λ€μ΄μ
)"""
# 쿼리 νλΌλ―Έν°
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
per_page = min(per_page, 100) # μ΅λ 100κ°
# μ λ ¬
sort_by = request.args.get('sort', 'timestamp')
order = request.args.get('order', 'desc')
# λ°μ΄ν° μ λ ¬
readings = sorted(
sensor_readings,
key=lambda x: x.get(sort_by, ''),
reverse=(order == 'desc')
)
# νμ΄μ§λ€μ΄μ
total = len(readings)
start = (page - 1) * per_page
end = start + per_page
page_data = readings[start:end]
return jsonify({
"readings": page_data,
"pagination": {
"page": page,
"per_page": per_page,
"total": total,
"pages": (total + per_page - 1) // per_page,
"has_next": end < total,
"has_prev": page > 1
}
})
4. JSON λ°μ΄ν° μ²λ¦¬¶
4.1 μμ² κ²μ¦¶
# utils/validators.py
"""μμ² λ°μ΄ν° κ²μ¦"""
from functools import wraps
from flask import request, jsonify
def validate_json(required_fields: list = None, optional_fields: list = None):
"""JSON μμ² κ²μ¦ λ°μ½λ μ΄ν°"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# JSON νμΈ
if not request.is_json:
return jsonify({"error": "Content-Type must be application/json"}), 400
data = request.get_json()
if data is None:
return jsonify({"error": "Invalid JSON"}), 400
# νμ νλ νμΈ
if required_fields:
missing = [f for f in required_fields if f not in data]
if missing:
return jsonify({
"error": "Missing required fields",
"fields": missing
}), 400
return f(*args, **kwargs)
return decorated_function
return decorator
# μ¬μ© μ
@app.route('/api/sensors', methods=['POST'])
@validate_json(required_fields=['name', 'type'])
def create_sensor():
data = request.get_json()
# ... μ²λ¦¬
4.2 Pydanticμ μ΄μ©ν κ²μ¦¶
# models/sensor.py
"""Pydantic λͺ¨λΈλ‘ λ°μ΄ν° κ²μ¦"""
from pydantic import BaseModel, Field, validator
from typing import Optional
from datetime import datetime
class SensorReading(BaseModel):
"""μΌμ λ°μ΄ν° λͺ¨λΈ"""
temperature: Optional[float] = Field(None, ge=-50, le=100)
humidity: Optional[float] = Field(None, ge=0, le=100)
pressure: Optional[float] = Field(None, ge=800, le=1200)
timestamp: datetime = Field(default_factory=datetime.now)
@validator('temperature', 'humidity', 'pressure', pre=True)
def round_values(cls, v):
if v is not None:
return round(v, 2)
return v
class SensorCreate(BaseModel):
"""μΌμ μμ± μμ²"""
name: str = Field(..., min_length=1, max_length=100)
type: str = Field(..., pattern=r'^(temperature|humidity|motion|generic)$')
location: Optional[str] = None
class DeviceCommand(BaseModel):
"""μ₯μΉ λͺ
λ Ή"""
command: str = Field(..., pattern=r'^(on|off|toggle|set)$')
params: Optional[dict] = None
# Flaskμμ μ¬μ©
from pydantic import ValidationError
@app.route('/api/sensors/<sensor_id>/data', methods=['POST'])
def post_data(sensor_id):
try:
reading = SensorReading(**request.get_json())
except ValidationError as e:
return jsonify({"error": e.errors()}), 400
# κ²μ¦λ λ°μ΄ν° μ¬μ©
data = reading.dict()
# ...
4.3 μλ΅ ν¬λ§·ν ¶
# utils/response.py
"""μλ΅ ν¬νΌ"""
from flask import jsonify
from datetime import datetime
from functools import wraps
def api_response(data=None, message=None, status_code=200):
"""νμ€ API μλ΅ μμ±"""
response = {
"success": 200 <= status_code < 300,
"timestamp": datetime.now().isoformat()
}
if message:
response["message"] = message
if data is not None:
response["data"] = data
return jsonify(response), status_code
def error_response(message: str, status_code: int = 400, details=None):
"""μλ¬ μλ΅ μμ±"""
response = {
"success": False,
"error": {
"message": message,
"code": status_code
},
"timestamp": datetime.now().isoformat()
}
if details:
response["error"]["details"] = details
return jsonify(response), status_code
# μλ¬ νΈλ€λ¬
@app.errorhandler(404)
def not_found(error):
return error_response("Resource not found", 404)
@app.errorhandler(500)
def internal_error(error):
return error_response("Internal server error", 500)
5. μ’ ν© μμ : IoT κ²μ΄νΈμ¨μ΄ μλ²¶
#!/usr/bin/env python3
"""IoT κ²μ΄νΈμ¨μ΄ μλ²"""
from flask import Flask, jsonify, request
from flask_cors import CORS
from datetime import datetime
import paho.mqtt.client as mqtt
import threading
import json
app = Flask(__name__)
CORS(app)
# λ°μ΄ν° μ μ₯μ
class DataStore:
def __init__(self):
self.sensors = {}
self.readings = []
self.devices = {}
self.commands = []
def add_reading(self, sensor_id: str, data: dict):
reading = {
"sensor_id": sensor_id,
"data": data,
"timestamp": datetime.now().isoformat()
}
self.readings.append(reading)
# μΌμ μλ λ±λ‘
if sensor_id not in self.sensors:
self.sensors[sensor_id] = {
"id": sensor_id,
"last_seen": reading["timestamp"]
}
else:
self.sensors[sensor_id]["last_seen"] = reading["timestamp"]
# μ΅λ 10000κ° μ μ§
if len(self.readings) > 10000:
self.readings = self.readings[-10000:]
store = DataStore()
# === MQTT ν΄λΌμ΄μΈνΈ ===
mqtt_client = mqtt.Client()
def on_mqtt_connect(client, userdata, flags, rc):
if rc == 0:
print("MQTT λΈλ‘컀 μ°κ²°λ¨")
client.subscribe("sensor/#")
def on_mqtt_message(client, userdata, msg):
try:
payload = json.loads(msg.payload.decode())
# topic: sensor/{sensor_id}/data
parts = msg.topic.split('/')
if len(parts) >= 2:
sensor_id = parts[1]
store.add_reading(sensor_id, payload)
print(f"[MQTT] {sensor_id}: {payload}")
except Exception as e:
print(f"MQTT λ©μμ§ μ²λ¦¬ μ€λ₯: {e}")
mqtt_client.on_connect = on_mqtt_connect
mqtt_client.on_message = on_mqtt_message
def start_mqtt():
try:
mqtt_client.connect("localhost", 1883)
mqtt_client.loop_start()
except Exception as e:
print(f"MQTT μ°κ²° μ€ν¨: {e}")
# === HTTP API ===
@app.route('/api/sensors', methods=['GET'])
def list_sensors():
return jsonify({
"sensors": list(store.sensors.values()),
"count": len(store.sensors)
})
@app.route('/api/sensors/<sensor_id>/data', methods=['GET'])
def get_sensor_data(sensor_id):
limit = request.args.get('limit', 100, type=int)
readings = [r for r in store.readings if r['sensor_id'] == sensor_id]
readings = readings[-limit:]
return jsonify({
"sensor_id": sensor_id,
"readings": readings,
"count": len(readings)
})
@app.route('/api/sensors/<sensor_id>/data', methods=['POST'])
def post_sensor_data(sensor_id):
data = request.get_json()
if not data:
return jsonify({"error": "No data"}), 400
store.add_reading(sensor_id, data)
# MQTTλ‘λ λ°ν (λ€λ₯Έ ꡬλ
μμκ²)
mqtt_client.publish(f"sensor/{sensor_id}/data", json.dumps(data))
return jsonify({"status": "ok"}), 201
@app.route('/api/devices/<device_id>/command', methods=['POST'])
def send_device_command(device_id):
data = request.get_json()
if not data or 'command' not in data:
return jsonify({"error": "Command required"}), 400
command = {
"device_id": device_id,
"command": data['command'],
"params": data.get('params', {}),
"timestamp": datetime.now().isoformat()
}
# MQTTλ‘ λͺ
λ Ή λ°ν
mqtt_client.publish(
f"device/{device_id}/command",
json.dumps(command)
)
store.commands.append(command)
return jsonify({"status": "sent", "command": command}), 202
@app.route('/api/stats', methods=['GET'])
def get_stats():
return jsonify({
"sensors_count": len(store.sensors),
"readings_count": len(store.readings),
"devices_count": len(store.devices),
"commands_count": len(store.commands)
})
if __name__ == "__main__":
# MQTT μ€λ λ μμ
mqtt_thread = threading.Thread(target=start_mqtt, daemon=True)
mqtt_thread.start()
# Flask μλ² μμ
app.run(host='0.0.0.0', port=5000, debug=True)
μ°μ΅ λ¬Έμ ¶
λ¬Έμ 1: μΌμ API¶
- μΌμ CRUD APIλ₯Ό ꡬννμΈμ.
- λ°μ΄ν° κ²μ¦μ μΆκ°νμΈμ.
λ¬Έμ 2: νμ΄μ§λ€μ΄μ ¶
- μΌμ λ°μ΄ν° λͺ©λ‘μ νμ΄μ§λ€μ΄μ μ ꡬννμΈμ.
- λ μ§ λ²μ νν°λ§μ μΆκ°νμΈμ.
λ¬Έμ 3: MQTT μ°λ¶
- HTTP POSTλ‘ λ°μ λ°μ΄ν°λ₯Ό MQTTλ‘ λ°ννμΈμ.
- MQTTλ‘ λ°μ λ°μ΄ν°λ₯Ό HTTP GETμΌλ‘ μ‘°ννμΈμ.
λ€μ λ¨κ³¶
- 08_Edge_AI_TFLite.md: μΌμ λ°μ΄ν° AI λΆμ
- 10_Home_Automation_Project.md: REST API κΈ°λ° μ€λ§νΈν
μ΅μ’ μ λ°μ΄νΈ: 2026-02-01