04. WiFi ๋คํธ์ํน
04. WiFi ๋คํธ์ํน¶
ํ์ต ๋ชฉํ¶
- ๋ผ์ฆ๋ฒ ๋ฆฌํ์ด WiFi ์ค์ ๋ฐฉ๋ฒ ์ต๋
- Python ์์ผ ํ๋ก๊ทธ๋๋ฐ ๊ธฐ์ด ์ดํด
- ESP32 WiFi ๊ฐ์ ํ์
- ๋คํธ์ํฌ ์ค์บ ๋ฐ ๋ชจ๋ํฐ๋ง
- HTTP ํด๋ผ์ด์ธํธ๋ก ๋ฐ์ดํฐ ์ ์ก
1. ๋ผ์ฆ๋ฒ ๋ฆฌํ์ด WiFi ์ค์ ¶
1.1 ๋ช ๋ น์ค WiFi ์ค์ ¶
# ํ์ฌ WiFi ์ํ ํ์ธ
iwconfig wlan0
# ์ฌ์ฉ ๊ฐ๋ฅํ ๋คํธ์ํฌ ์ค์บ
sudo iwlist wlan0 scan | grep -E "ESSID|Quality"
# WiFi ์ฐ๊ฒฐ (nmcli ์ฌ์ฉ)
sudo nmcli dev wifi connect "SSID์ด๋ฆ" password "๋น๋ฐ๋ฒํธ"
# ์ฐ๊ฒฐ ์ํ ํ์ธ
nmcli connection show
# IP ์ฃผ์ ํ์ธ
ip addr show wlan0
1.2 wpa_supplicant ์ค์ ¶
# /etc/wpa_supplicant/wpa_supplicant.conf ํธ์ง
sudo nano /etc/wpa_supplicant/wpa_supplicant.conf
# /etc/wpa_supplicant/wpa_supplicant.conf
country=KR
ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev
update_config=1
# ๊ธฐ๋ณธ WPA2 ๋คํธ์ํฌ
network={
ssid="MyNetwork"
psk="MyPassword"
key_mgmt=WPA-PSK
}
# ์จ๊ฒจ์ง ๋คํธ์ํฌ
network={
ssid="HiddenNetwork"
scan_ssid=1
psk="Password"
}
# ์ฐ์ ์์ ์ค์ (๋์ ๊ฐ = ์ฐ์ )
network={
ssid="PreferredNetwork"
psk="Password"
priority=10
}
1.3 Python์ผ๋ก WiFi ์ ๋ณด ์กฐํ¶
#!/usr/bin/env python3
"""WiFi ์ฐ๊ฒฐ ์ ๋ณด ์กฐํ"""
import subprocess
import re
def get_wifi_info() -> dict:
"""ํ์ฌ WiFi ์ฐ๊ฒฐ ์ ๋ณด ๋ฐํ"""
info = {}
try:
# SSID ์กฐํ
result = subprocess.run(
['iwgetid', '-r'],
capture_output=True,
text=True
)
info['ssid'] = result.stdout.strip()
# IP ์ฃผ์ ์กฐํ
result = subprocess.run(
['hostname', '-I'],
capture_output=True,
text=True
)
ips = result.stdout.strip().split()
info['ip_addresses'] = ips
# ์ ํธ ๊ฐ๋ ์กฐํ
result = subprocess.run(
['iwconfig', 'wlan0'],
capture_output=True,
text=True
)
match = re.search(r'Signal level=(-?\d+)', result.stdout)
if match:
info['signal_dbm'] = int(match.group(1))
# MAC ์ฃผ์
result = subprocess.run(
['cat', '/sys/class/net/wlan0/address'],
capture_output=True,
text=True
)
info['mac_address'] = result.stdout.strip()
except Exception as e:
info['error'] = str(e)
return info
def get_wifi_networks() -> list:
"""์ฃผ๋ณ WiFi ๋คํธ์ํฌ ์ค์บ"""
networks = []
try:
result = subprocess.run(
['sudo', 'iwlist', 'wlan0', 'scan'],
capture_output=True,
text=True
)
current_network = {}
for line in result.stdout.split('\n'):
if 'ESSID:' in line:
ssid = re.search(r'ESSID:"(.+)"', line)
if ssid and current_network:
networks.append(current_network)
current_network = {'ssid': ssid.group(1) if ssid else ''}
elif 'Quality=' in line:
quality = re.search(r'Quality=(\d+)/(\d+)', line)
if quality:
current_network['quality'] = f"{quality.group(1)}/{quality.group(2)}"
signal = re.search(r'Signal level=(-?\d+)', line)
if signal:
current_network['signal_dbm'] = int(signal.group(1))
if current_network:
networks.append(current_network)
except Exception as e:
print(f"์ค์บ ์คํจ: {e}")
return networks
if __name__ == "__main__":
print("=== WiFi ์ฐ๊ฒฐ ์ ๋ณด ===")
info = get_wifi_info()
for key, value in info.items():
print(f" {key}: {value}")
print("\n=== ์ฃผ๋ณ WiFi ๋คํธ์ํฌ ===")
networks = get_wifi_networks()
for net in networks[:10]: # ์์ 10๊ฐ๋ง
print(f" {net.get('ssid', 'Unknown')}: {net.get('signal_dbm', 'N/A')} dBm")
2. Python ์์ผ ํ๋ก๊ทธ๋๋ฐ¶
2.1 ์์ผ ๊ธฐ์ด¶
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ ์์ผ ํต์ ํ๋ฆ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ ํด๋ผ์ด์ธํธ ์๋ฒ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ socket()โ โ socket()โ โ
โ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โ
โ โ โ โ
โ โ โโโโโโดโโโโโ โ
โ โ โ bind() โ โ
โ โ โโโโโโฌโโโโโ โ
โ โ โโโโโโดโโโโโ โ
โ โ โ listen()โ โ
โ โ โโโโโโฌโโโโโ โ
โ โโโโโโดโโโโโ ์ฐ๊ฒฐ ์์ฒญ โโโโโโดโโโโโ โ
โ โconnect()โ โโโโโโโโโโโโโโโโโโโโโโโถโ accept()โ โ
โ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โ
โ โ โ โ
โ โโโโโโดโโโโโ ๋ฐ์ดํฐ ์ก์์ โโโโโโดโโโโโ โ
โ โ send() โ โโโโโโโโโโโโโโโโโโโโโโถโ recv() โ โ
โ โ recv() โ โ send() โ โ
โ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โ
โ โ โ โ
โ โโโโโโดโโโโโ โโโโโโดโโโโโ โ
โ โ close() โ โ close() โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
2.2 TCP ์๋ฒ¶
#!/usr/bin/env python3
"""TCP ์๋ฒ - ์ผ์ ๋ฐ์ดํฐ ์์ """
import socket
import json
from datetime import datetime
HOST = '0.0.0.0' # ๋ชจ๋ ์ธํฐํ์ด์ค
PORT = 9999
def start_tcp_server():
"""TCP ์๋ฒ ์์"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
# ์ฃผ์ ์ฌ์ฌ์ฉ ํ์ฉ
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((HOST, PORT))
server.listen(5)
print(f"TCP ์๋ฒ ์์: {HOST}:{PORT}")
while True:
client, address = server.accept()
print(f"ํด๋ผ์ด์ธํธ ์ฐ๊ฒฐ: {address}")
with client:
while True:
data = client.recv(1024)
if not data:
break
try:
# JSON ๋ฐ์ดํฐ ํ์ฑ
message = json.loads(data.decode('utf-8'))
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] ์์ : {message}")
# ์๋ต ์ ์ก
response = {
"status": "ok",
"received": message.get("sensor_id")
}
client.sendall(json.dumps(response).encode('utf-8'))
except json.JSONDecodeError:
print(f"์๋ชป๋ JSON: {data}")
print(f"ํด๋ผ์ด์ธํธ ์ฐ๊ฒฐ ์ข
๋ฃ: {address}")
if __name__ == "__main__":
start_tcp_server()
2.3 TCP ํด๋ผ์ด์ธํธ¶
#!/usr/bin/env python3
"""TCP ํด๋ผ์ด์ธํธ - ์ผ์ ๋ฐ์ดํฐ ์ ์ก"""
import socket
import json
import time
import random
SERVER_HOST = '192.168.1.100' # ์๋ฒ IP
SERVER_PORT = 9999
def send_sensor_data():
"""์ผ์ ๋ฐ์ดํฐ๋ฅผ ์๋ฒ๋ก ์ ์ก"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client:
client.connect((SERVER_HOST, SERVER_PORT))
print(f"์๋ฒ ์ฐ๊ฒฐ: {SERVER_HOST}:{SERVER_PORT}")
sensor_id = "temp_sensor_01"
try:
while True:
# ์ผ์ ๋ฐ์ดํฐ ์์ฑ
data = {
"sensor_id": sensor_id,
"temperature": round(random.uniform(20, 30), 1),
"humidity": round(random.uniform(40, 70), 1),
"timestamp": time.time()
}
# ์ ์ก
message = json.dumps(data).encode('utf-8')
client.sendall(message)
print(f"์ ์ก: {data}")
# ์๋ต ์์
response = client.recv(1024)
if response:
print(f"์๋ต: {response.decode('utf-8')}")
time.sleep(5)
except KeyboardInterrupt:
print("\n์ฐ๊ฒฐ ์ข
๋ฃ")
if __name__ == "__main__":
send_sensor_data()
2.4 UDP ์์ผ¶
#!/usr/bin/env python3
"""UDP ์์ผ ํต์ (๋น ๋ฅธ ์ผ์ ๋ฐ์ดํฐ ์ ์ก)"""
import socket
import json
import time
# === UDP ์๋ฒ ===
def udp_server(port: int = 9998):
"""UDP ์๋ฒ"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('0.0.0.0', port))
print(f"UDP ์๋ฒ ์์: ํฌํธ {port}")
while True:
data, addr = sock.recvfrom(1024)
message = json.loads(data.decode('utf-8'))
print(f"[{addr}] {message}")
# === UDP ํด๋ผ์ด์ธํธ ===
def udp_client(server_ip: str, port: int = 9998):
"""UDP ํด๋ผ์ด์ธํธ"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sensor_data = {
"sensor_id": "motion_01",
"motion_detected": True,
"timestamp": time.time()
}
message = json.dumps(sensor_data).encode('utf-8')
sock.sendto(message, (server_ip, port))
print(f"์ ์ก ์๋ฃ: {sensor_data}")
sock.close()
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == 'server':
udp_server()
else:
udp_client('192.168.1.100')
3. ESP32 WiFi ๊ฐ์¶
3.1 ESP32์ ๋ผ์ฆ๋ฒ ๋ฆฌํ์ด ๋น๊ต¶
| ํน์ฑ | ESP32 | Raspberry Pi |
|---|---|---|
| ํ๋ก์ธ์ | Xtensa 240MHz | ARM 1.5GHz |
| RAM | 520KB | 1-8GB |
| OS | FreeRTOS/์์ | Linux |
| ์ธ์ด | C/C++, MicroPython | Python, ๋ชจ๋ ์ธ์ด |
| WiFi | ๋ด์ฅ | ๋ด์ฅ (Pi 3+) |
| ์ ๋ ฅ | ๋ฎ์ (80mA) | ๋์ (700mA+) |
| ์ฉ๋ | ์ผ์ ๋ ธ๋ | ๊ฒ์ดํธ์จ์ด, ์ฃ์ง |
3.2 ESP32 MicroPython WiFi ์์ ¶
# ESP32์ฉ MicroPython ์ฝ๋
# (์ฐธ๊ณ ์ฉ - ๋ผ์ฆ๋ฒ ๋ฆฌํ์ด์์๋ ์คํ ๋ถ๊ฐ)
import network
import time
def connect_wifi(ssid: str, password: str) -> str:
"""ESP32 WiFi ์ฐ๊ฒฐ"""
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print(f'WiFi ์ฐ๊ฒฐ ์ค: {ssid}')
wlan.connect(ssid, password)
# ์ฐ๊ฒฐ ๋๊ธฐ
timeout = 10
while not wlan.isconnected() and timeout > 0:
time.sleep(1)
timeout -= 1
if wlan.isconnected():
ip = wlan.ifconfig()[0]
print(f'์ฐ๊ฒฐ๋จ! IP: {ip}')
return ip
else:
print('์ฐ๊ฒฐ ์คํจ')
return None
# ์ฌ์ฉ
# ip = connect_wifi("MySSID", "MyPassword")
3.3 ๋ผ์ฆ๋ฒ ๋ฆฌํ์ด - ESP32 ํต์ ์ํคํ ์ฒ¶
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ ๋ผ์ฆ๋ฒ ๋ฆฌํ์ด - ESP32 ํต์ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโ WiFi โโโโโโโโโโโโโโโโ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโถโ โ โ
โ โ Raspberry โ โ ESP32 โ โ
โ โ Pi โ โ ์ผ์ ๋
ธ๋ โ โ
โ โ โ โ โ โ
โ โ - MQTT โ TCP/UDP โ - ์จ๋ ์ผ์ โ โ
โ โ Broker โโโโโโโโโโโโโโโโโโโโโโโถโ - ์ต๋ ์ผ์ โ โ
โ โ - ๋ฐ์ดํฐ โ โ - ๋ชจ์
์ผ์ โ โ
โ โ ์์ง โ HTTP โ โ โ
โ โ - ๋ถ์ โโโโโโโโโโโโโโโโโโโโโโโถโ ์ ์ ๋ ฅ โ โ
โ โ โ โ ๋์ โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ โ โ
โ โ โ โ
โ โผ โผ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ ํด๋ผ์ฐ๋ โ โ ๋ฐฐํฐ๋ฆฌ โ โ
โ โ AWS/GCP โ โ ๋์ ๊ฐ๋ฅ โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
4. ๋คํธ์ํฌ ์ค์บ ๋ฐ ๋ชจ๋ํฐ๋ง¶
4.1 ๋คํธ์ํฌ ์ฅ์น ์ค์บ¶
#!/usr/bin/env python3
"""๋คํธ์ํฌ ์ฅ์น ์ค์บ"""
import subprocess
import re
from concurrent.futures import ThreadPoolExecutor
import socket
def get_local_network() -> str:
"""๋ก์ปฌ ๋คํธ์ํฌ ์ฃผ์ ๋ฐํ"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
# ๋คํธ์ํฌ ์ฃผ์ ์ถ์ถ (์: 192.168.1.0/24)
parts = ip.split('.')
return f"{parts[0]}.{parts[1]}.{parts[2]}.0/24"
def ping_host(ip: str) -> dict | None:
"""๋จ์ผ ํธ์คํธ ํ"""
try:
result = subprocess.run(
['ping', '-c', '1', '-W', '1', ip],
capture_output=True,
text=True,
timeout=2
)
if result.returncode == 0:
return {'ip': ip, 'status': 'up'}
except:
pass
return None
def scan_network(network: str = None) -> list:
"""๋คํธ์ํฌ ์ ์ฒด ์ค์บ"""
if network is None:
network = get_local_network()
# IP ๋ฒ์ ์์ฑ
base = '.'.join(network.split('.')[:-1])
ips = [f"{base}.{i}" for i in range(1, 255)]
print(f"์ค์บ ์ค: {network}")
results = []
with ThreadPoolExecutor(max_workers=50) as executor:
for result in executor.map(ping_host, ips):
if result:
results.append(result)
print(f" ๋ฐ๊ฒฌ: {result['ip']}")
return results
def get_hostname(ip: str) -> str:
"""IP ์ฃผ์์์ ํธ์คํธ๋ช
์กฐํ"""
try:
return socket.gethostbyaddr(ip)[0]
except:
return "Unknown"
if __name__ == "__main__":
devices = scan_network()
print(f"\n=== ๋ฐ๊ฒฌ๋ ์ฅ์น: {len(devices)}๊ฐ ===")
for device in devices:
hostname = get_hostname(device['ip'])
print(f" {device['ip']:15} - {hostname}")
4.2 ํฌํธ ์ค์บ¶
#!/usr/bin/env python3
"""๊ฐ๋จํ ํฌํธ ์ค์บ"""
import socket
from concurrent.futures import ThreadPoolExecutor
COMMON_PORTS = {
22: 'SSH',
80: 'HTTP',
443: 'HTTPS',
1883: 'MQTT',
3306: 'MySQL',
5432: 'PostgreSQL',
8080: 'HTTP-Alt',
8883: 'MQTT-TLS'
}
def check_port(target: str, port: int) -> dict | None:
"""ํฌํธ ์ด๋ฆผ ํ์ธ"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
try:
result = sock.connect_ex((target, port))
if result == 0:
return {
'port': port,
'status': 'open',
'service': COMMON_PORTS.get(port, 'unknown')
}
except:
pass
finally:
sock.close()
return None
def scan_ports(target: str, ports: list = None) -> list:
"""์ฌ๋ฌ ํฌํธ ์ค์บ"""
if ports is None:
ports = list(COMMON_PORTS.keys())
print(f"ํฌํธ ์ค์บ: {target}")
results = []
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(check_port, target, port): port for port in ports}
for future in futures:
result = future.result()
if result:
results.append(result)
print(f" ํฌํธ {result['port']} ({result['service']}): OPEN")
return results
if __name__ == "__main__":
target = input("์ค์บํ IP ์ฃผ์: ")
scan_ports(target)
5. HTTP ํด๋ผ์ด์ธํธ¶
5.1 requests ๋ผ์ด๋ธ๋ฌ๋ฆฌ¶
#!/usr/bin/env python3
"""HTTP ํด๋ผ์ด์ธํธ - ์ผ์ ๋ฐ์ดํฐ ์ ์ก"""
import requests
import time
import json
API_BASE = "http://192.168.1.100:5000/api"
def send_sensor_data(sensor_id: str, data: dict) -> bool:
"""์ผ์ ๋ฐ์ดํฐ POST ์ ์ก"""
url = f"{API_BASE}/sensors/{sensor_id}/data"
try:
response = requests.post(
url,
json=data,
headers={"Content-Type": "application/json"},
timeout=5
)
if response.status_code == 201:
print(f"๋ฐ์ดํฐ ์ ์ก ์ฑ๊ณต: {data}")
return True
else:
print(f"์ ์ก ์คํจ: {response.status_code} - {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f"๋คํธ์ํฌ ์ค๋ฅ: {e}")
return False
def get_sensor_config(sensor_id: str) -> dict | None:
"""์ผ์ ์ค์ ์กฐํ"""
url = f"{API_BASE}/sensors/{sensor_id}/config"
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
return response.json()
except requests.exceptions.RequestException as e:
print(f"์กฐํ ์คํจ: {e}")
return None
def periodic_reporting(sensor_id: str, interval: int = 10):
"""์ฃผ๊ธฐ์ ๋ฐ์ดํฐ ๋ฆฌํฌํ
"""
import random
print(f"์ผ์ {sensor_id} ๋ฆฌํฌํ
์์ (๊ฐ๊ฒฉ: {interval}์ด)")
while True:
data = {
"temperature": round(random.uniform(20, 30), 1),
"humidity": round(random.uniform(40, 70), 1),
"timestamp": int(time.time())
}
send_sensor_data(sensor_id, data)
time.sleep(interval)
if __name__ == "__main__":
periodic_reporting("sensor_001", 10)
5.2 ๋น๋๊ธฐ HTTP ํด๋ผ์ด์ธํธ¶
#!/usr/bin/env python3
"""๋น๋๊ธฐ HTTP ํด๋ผ์ด์ธํธ (aiohttp)"""
import asyncio
import aiohttp
import time
API_BASE = "http://192.168.1.100:5000/api"
async def send_data_async(session: aiohttp.ClientSession,
sensor_id: str,
data: dict) -> bool:
"""๋น๋๊ธฐ ๋ฐ์ดํฐ ์ ์ก"""
url = f"{API_BASE}/sensors/{sensor_id}/data"
try:
async with session.post(url, json=data) as response:
if response.status == 201:
return True
except aiohttp.ClientError as e:
print(f"์ค๋ฅ: {e}")
return False
async def batch_send(sensors: list, data_list: list):
"""์ฌ๋ฌ ์ผ์ ๋ฐ์ดํฐ ๋์ ์ ์ก"""
async with aiohttp.ClientSession() as session:
tasks = [
send_data_async(session, sensor, data)
for sensor, data in zip(sensors, data_list)
]
results = await asyncio.gather(*tasks)
success = sum(results)
print(f"์ ์ก ์๋ฃ: {success}/{len(results)}")
if __name__ == "__main__":
sensors = ["sensor_001", "sensor_002", "sensor_003"]
data_list = [
{"temperature": 25.5, "timestamp": time.time()},
{"temperature": 26.0, "timestamp": time.time()},
{"temperature": 24.8, "timestamp": time.time()}
]
asyncio.run(batch_send(sensors, data_list))
5.3 HTTP ํด๋ผ์ด์ธํธ with ์ฌ์๋¶
#!/usr/bin/env python3
"""์ฌ์๋ ๋ก์ง์ด ์๋ HTTP ํด๋ผ์ด์ธํธ"""
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import time
def create_session_with_retry(
retries: int = 3,
backoff_factor: float = 0.5,
status_forcelist: tuple = (500, 502, 503, 504)
) -> requests.Session:
"""์ฌ์๋ ์ค์ ์ด ๋ ์ธ์
์์ฑ"""
session = requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
class IoTHttpClient:
"""IoT์ฉ HTTP ํด๋ผ์ด์ธํธ"""
def __init__(self, base_url: str):
self.base_url = base_url.rstrip('/')
self.session = create_session_with_retry()
self.session.headers.update({
'Content-Type': 'application/json',
'User-Agent': 'IoT-Sensor/1.0'
})
def send_data(self, endpoint: str, data: dict) -> dict:
"""๋ฐ์ดํฐ ์ ์ก"""
url = f"{self.base_url}/{endpoint}"
try:
response = self.session.post(url, json=data, timeout=10)
response.raise_for_status()
return {"success": True, "data": response.json()}
except requests.exceptions.RequestException as e:
return {"success": False, "error": str(e)}
def get_config(self, endpoint: str) -> dict:
"""์ค์ ์กฐํ"""
url = f"{self.base_url}/{endpoint}"
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
return {"success": True, "data": response.json()}
except requests.exceptions.RequestException as e:
return {"success": False, "error": str(e)}
def close(self):
"""์ธ์
์ข
๋ฃ"""
self.session.close()
# ์ฌ์ฉ ์
if __name__ == "__main__":
client = IoTHttpClient("http://192.168.1.100:5000/api")
result = client.send_data("sensors/001/data", {
"temperature": 25.5,
"timestamp": time.time()
})
print(result)
client.close()
์ฐ์ต ๋ฌธ์ ¶
๋ฌธ์ 1: WiFi ๋ชจ๋ํฐ๋ง¶
- ํ์ฌ WiFi ์ฐ๊ฒฐ ์ํ๋ฅผ ๋ชจ๋ํฐ๋งํ๋ ์คํฌ๋ฆฝํธ๋ฅผ ์์ฑํ์ธ์.
- ์ ํธ ๊ฐ๋๊ฐ -70dBm ์ดํ๋ก ๋จ์ด์ง๋ฉด ๊ฒฝ๊ณ ๋ฅผ ์ถ๋ ฅํ์ธ์.
๋ฌธ์ 2: ๋ก์ปฌ ์๋ฒ¶
- TCP ์๋ฒ๋ฅผ ์์ฑํ์ฌ ์ผ์ ๋ฐ์ดํฐ๋ฅผ ์์ ํ์ธ์.
- ์์ ๋ ๋ฐ์ดํฐ๋ฅผ ํ์ผ์ ์ ์ฅํ์ธ์.
๋ฌธ์ 3: HTTP ๋ฆฌํฌํฐ¶
- ์จ๋ ์ผ์ ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ธฐ์ ์ผ๋ก HTTP POSTํ๋ ํด๋ผ์ด์ธํธ๋ฅผ ์์ฑํ์ธ์.
- ๋คํธ์ํฌ ์ค๋ฅ ์ ์ฌ์๋ ๋ก์ง์ ๊ตฌํํ์ธ์.
๋ค์ ๋จ๊ณ¶
- 05_BLE_Connectivity.md: BLE ํต์ ์ผ๋ก ์ ์ ๋ ฅ ์ผ์ ์ฐ๊ฒฐ
- 06_MQTT_Protocol.md: MQTT๋ก ํจ์จ์ ์ธ IoT ๋ฉ์์ง
์ต์ข ์ ๋ฐ์ดํธ: 2026-02-01