1#!/usr/bin/env python3
2"""
3WiFi ๋คํธ์ํฌ ์ค์บ๋ ๋ฐ ์์ผ ํ๋ก๊ทธ๋๋ฐ ์์
4
5์ด ์คํฌ๋ฆฝํธ๋ ๋ค์ ๊ธฐ๋ฅ์ ์ ๊ณตํฉ๋๋ค:
61. WiFi ๋คํธ์ํฌ ์ค์บ (์๋ฎฌ๋ ์ด์
๋ชจ๋ ์ง์)
72. TCP ํด๋ผ์ด์ธํธ/์๋ฒ ์์
83. HTTP ํด๋ผ์ด์ธํธ๋ก ์ผ์ ๋ฐ์ดํฐ ์ ์ก
94. ๋คํธ์ํฌ ์ฐ๊ฒฐ ์ํ ๋ชจ๋ํฐ๋ง
105. ๋คํธ์ํฌ ์ค๋ฅ ์ฒ๋ฆฌ ๋ฐ ์ฌ์๋ ๋ก์ง
11
12์ฐธ๊ณ : content/ko/IoT_Embedded/04_WiFi_Networking.md
13"""
14
15import subprocess
16import re
17import socket
18import json
19import time
20import random
21import sys
22from typing import Optional, List, Dict
23from datetime import datetime
24
25
26# ============================================================================
27# WiFi ๋คํธ์ํฌ ์ค์บ
28# ============================================================================
29
30def is_raspberry_pi() -> bool:
31 """๋ผ์ฆ๋ฒ ๋ฆฌํ์ด ํ๊ฒฝ์ธ์ง ํ์ธ"""
32 try:
33 with open('/proc/cpuinfo', 'r') as f:
34 cpuinfo = f.read()
35 return 'BCM' in cpuinfo or 'Raspberry' in cpuinfo
36 except FileNotFoundError:
37 return False
38
39
40def get_wifi_info() -> dict:
41 """
42 ํ์ฌ WiFi ์ฐ๊ฒฐ ์ ๋ณด ์กฐํ
43
44 Returns:
45 dict: WiFi ์ ๋ณด (ssid, ip_addresses, signal_dbm, mac_address)
46 """
47 info = {}
48
49 if not is_raspberry_pi():
50 # ์๋ฎฌ๋ ์ด์
๋ชจ๋
51 print("[์๋ฎฌ๋ ์ด์
] ์ค์ ๋ผ์ฆ๋ฒ ๋ฆฌํ์ด๊ฐ ์๋๋๋ค. ๊ฐ์ ๋ฐ์ดํฐ ์ฌ์ฉ")
52 info = {
53 'ssid': 'SimulatedWiFi',
54 'ip_addresses': ['192.168.1.100'],
55 'signal_dbm': -45,
56 'mac_address': 'AA:BB:CC:DD:EE:FF'
57 }
58 return info
59
60 try:
61 # SSID ์กฐํ
62 result = subprocess.run(
63 ['iwgetid', '-r'],
64 capture_output=True,
65 text=True,
66 timeout=5
67 )
68 info['ssid'] = result.stdout.strip()
69
70 # IP ์ฃผ์ ์กฐํ
71 result = subprocess.run(
72 ['hostname', '-I'],
73 capture_output=True,
74 text=True,
75 timeout=5
76 )
77 ips = result.stdout.strip().split()
78 info['ip_addresses'] = ips
79
80 # ์ ํธ ๊ฐ๋ ์กฐํ
81 result = subprocess.run(
82 ['iwconfig', 'wlan0'],
83 capture_output=True,
84 text=True,
85 timeout=5
86 )
87 match = re.search(r'Signal level=(-?\d+)', result.stdout)
88 if match:
89 info['signal_dbm'] = int(match.group(1))
90
91 # MAC ์ฃผ์
92 result = subprocess.run(
93 ['cat', '/sys/class/net/wlan0/address'],
94 capture_output=True,
95 text=True,
96 timeout=5
97 )
98 info['mac_address'] = result.stdout.strip()
99
100 except Exception as e:
101 info['error'] = str(e)
102
103 return info
104
105
106def get_wifi_networks() -> List[dict]:
107 """
108 ์ฃผ๋ณ WiFi ๋คํธ์ํฌ ์ค์บ
109
110 Returns:
111 list: ๋คํธ์ํฌ ์ ๋ณด ๋ฆฌ์คํธ (ssid, quality, signal_dbm)
112 """
113 networks = []
114
115 if not is_raspberry_pi():
116 # ์๋ฎฌ๋ ์ด์
๋ชจ๋
117 print("[์๋ฎฌ๋ ์ด์
] WiFi ์ค์บ ๊ฒฐ๊ณผ ์์ฑ ์ค...")
118 networks = [
119 {'ssid': 'HomeNetwork', 'quality': '70/70', 'signal_dbm': -35},
120 {'ssid': 'OfficeWiFi', 'quality': '65/70', 'signal_dbm': -42},
121 {'ssid': 'GuestNetwork', 'quality': '50/70', 'signal_dbm': -58},
122 {'ssid': 'Neighbor_2.4G', 'quality': '40/70', 'signal_dbm': -65},
123 {'ssid': 'CafeWiFi', 'quality': '30/70', 'signal_dbm': -75},
124 ]
125 return networks
126
127 try:
128 result = subprocess.run(
129 ['sudo', 'iwlist', 'wlan0', 'scan'],
130 capture_output=True,
131 text=True,
132 timeout=10
133 )
134
135 current_network = {}
136 for line in result.stdout.split('\n'):
137 if 'ESSID:' in line:
138 ssid = re.search(r'ESSID:"(.+)"', line)
139 if ssid and current_network:
140 networks.append(current_network)
141 current_network = {'ssid': ssid.group(1) if ssid else ''}
142
143 elif 'Quality=' in line:
144 quality = re.search(r'Quality=(\d+)/(\d+)', line)
145 if quality:
146 current_network['quality'] = f"{quality.group(1)}/{quality.group(2)}"
147
148 signal = re.search(r'Signal level=(-?\d+)', line)
149 if signal:
150 current_network['signal_dbm'] = int(signal.group(1))
151
152 if current_network:
153 networks.append(current_network)
154
155 except Exception as e:
156 print(f"์ค์บ ์คํจ: {e}")
157 print("์๋ฎฌ๋ ์ด์
๋ชจ๋๋ก ์ ํํฉ๋๋ค...")
158 return get_wifi_networks() # ์ฌ๊ท ํธ์ถ๋ก ์๋ฎฌ๋ ์ด์
๋ชจ๋ ์ฌ์ฉ
159
160 return networks
161
162
163def monitor_wifi_signal(threshold_dbm: int = -70, interval: int = 5, duration: int = 30):
164 """
165 WiFi ์ ํธ ๊ฐ๋ ๋ชจ๋ํฐ๋ง
166
167 Args:
168 threshold_dbm: ๊ฒฝ๊ณ ์๊ณ๊ฐ (dBm)
169 interval: ํ์ธ ๊ฐ๊ฒฉ (์ด)
170 duration: ๋ชจ๋ํฐ๋ง ๊ธฐ๊ฐ (์ด)
171 """
172 print(f"=== WiFi ์ ํธ ๋ชจ๋ํฐ๋ง ์์ ===")
173 print(f"๊ฒฝ๊ณ ์๊ณ๊ฐ: {threshold_dbm} dBm")
174 print(f"ํ์ธ ๊ฐ๊ฒฉ: {interval}์ด, ์ด {duration}์ด")
175 print()
176
177 start_time = time.time()
178
179 while time.time() - start_time < duration:
180 info = get_wifi_info()
181
182 if 'error' in info:
183 print(f"[{datetime.now().strftime('%H:%M:%S')}] ์ค๋ฅ: {info['error']}")
184 else:
185 signal = info.get('signal_dbm', 0)
186 ssid = info.get('ssid', 'Unknown')
187
188 status = "์ํธ" if signal > threshold_dbm else "โ ๏ธ ๊ฒฝ๊ณ "
189 print(f"[{datetime.now().strftime('%H:%M:%S')}] {ssid}: {signal} dBm ({status})")
190
191 if signal <= threshold_dbm:
192 print(f" โ ๏ธ ์ ํธ๊ฐ ์ฝํฉ๋๋ค! ({signal} dBm <= {threshold_dbm} dBm)")
193
194 time.sleep(interval)
195
196 print("\n๋ชจ๋ํฐ๋ง ์ข
๋ฃ")
197
198
199# ============================================================================
200# TCP ์์ผ ํ๋ก๊ทธ๋๋ฐ
201# ============================================================================
202
203def tcp_server(host: str = '0.0.0.0', port: int = 9999):
204 """
205 TCP ์๋ฒ - ์ผ์ ๋ฐ์ดํฐ ์์
206
207 Args:
208 host: ๋ฐ์ธ๋ํ IP ์ฃผ์
209 port: ํฌํธ ๋ฒํธ
210 """
211 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
212 # ์ฃผ์ ์ฌ์ฌ์ฉ ํ์ฉ
213 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
214
215 server.bind((host, port))
216 server.listen(5)
217
218 print(f"TCP ์๋ฒ ์์: {host}:{port}")
219 print("ํด๋ผ์ด์ธํธ ์ฐ๊ฒฐ ๋๊ธฐ ์ค... (Ctrl+C๋ก ์ข
๋ฃ)")
220
221 try:
222 while True:
223 client, address = server.accept()
224 print(f"\nํด๋ผ์ด์ธํธ ์ฐ๊ฒฐ: {address}")
225
226 with client:
227 while True:
228 data = client.recv(1024)
229 if not data:
230 break
231
232 try:
233 # JSON ๋ฐ์ดํฐ ํ์ฑ
234 message = json.loads(data.decode('utf-8'))
235 timestamp = datetime.now().strftime("%H:%M:%S")
236 print(f"[{timestamp}] ์์ : {message}")
237
238 # ์๋ต ์ ์ก
239 response = {
240 "status": "ok",
241 "received": message.get("sensor_id"),
242 "timestamp": time.time()
243 }
244 client.sendall(json.dumps(response).encode('utf-8'))
245
246 except json.JSONDecodeError:
247 print(f"์๋ชป๋ JSON: {data}")
248
249 print(f"ํด๋ผ์ด์ธํธ ์ฐ๊ฒฐ ์ข
๋ฃ: {address}")
250
251 except KeyboardInterrupt:
252 print("\n์๋ฒ ์ข
๋ฃ")
253
254
255def tcp_client(server_host: str, server_port: int = 9999, count: int = 10):
256 """
257 TCP ํด๋ผ์ด์ธํธ - ์ผ์ ๋ฐ์ดํฐ ์ ์ก
258
259 Args:
260 server_host: ์๋ฒ IP ์ฃผ์
261 server_port: ์๋ฒ ํฌํธ
262 count: ์ ์ก ํ์
263 """
264 try:
265 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client:
266 print(f"์๋ฒ ์ฐ๊ฒฐ ์ค: {server_host}:{server_port}")
267 client.connect((server_host, server_port))
268 print(f"์๋ฒ ์ฐ๊ฒฐ ์ฑ๊ณต!")
269
270 sensor_id = "temp_sensor_01"
271
272 for i in range(count):
273 # ์ผ์ ๋ฐ์ดํฐ ์์ฑ (์๋ฎฌ๋ ์ด์
)
274 data = {
275 "sensor_id": sensor_id,
276 "temperature": round(random.uniform(20, 30), 1),
277 "humidity": round(random.uniform(40, 70), 1),
278 "timestamp": time.time(),
279 "sequence": i + 1
280 }
281
282 # ์ ์ก
283 message = json.dumps(data).encode('utf-8')
284 client.sendall(message)
285 print(f"[{i+1}/{count}] ์ ์ก: ์จ๋={data['temperature']}ยฐC, ์ต๋={data['humidity']}%")
286
287 # ์๋ต ์์
288 try:
289 client.settimeout(5.0)
290 response = client.recv(1024)
291 if response:
292 resp_data = json.loads(response.decode('utf-8'))
293 print(f" ์๋ต: {resp_data['status']}")
294 except socket.timeout:
295 print(f" ์๋ต ํ์์์")
296
297 if i < count - 1:
298 time.sleep(2)
299
300 print("\n์ ์ก ์๋ฃ")
301
302 except ConnectionRefusedError:
303 print(f"์ค๋ฅ: ์๋ฒ์ ์ฐ๊ฒฐํ ์ ์์ต๋๋ค ({server_host}:{server_port})")
304 print("์๋ฒ๊ฐ ์คํ ์ค์ธ์ง ํ์ธํ์ธ์.")
305 except Exception as e:
306 print(f"์ค๋ฅ: {e}")
307
308
309# ============================================================================
310# HTTP ํด๋ผ์ด์ธํธ
311# ============================================================================
312
313def send_sensor_data_http(api_base: str, sensor_id: str, data: dict) -> bool:
314 """
315 HTTP POST๋ก ์ผ์ ๋ฐ์ดํฐ ์ ์ก
316
317 Args:
318 api_base: API ๋ฒ ์ด์ค URL
319 sensor_id: ์ผ์ ID
320 data: ์ ์กํ ๋ฐ์ดํฐ
321
322 Returns:
323 bool: ์ฑ๊ณต ์ฌ๋ถ
324 """
325 try:
326 import requests
327 except ImportError:
328 print("requests ๋ผ์ด๋ธ๋ฌ๋ฆฌ๊ฐ ํ์ํฉ๋๋ค: pip install requests")
329 return False
330
331 url = f"{api_base}/sensors/{sensor_id}/data"
332
333 try:
334 response = requests.post(
335 url,
336 json=data,
337 headers={"Content-Type": "application/json"},
338 timeout=5
339 )
340
341 if response.status_code in (200, 201):
342 print(f"๋ฐ์ดํฐ ์ ์ก ์ฑ๊ณต: {data}")
343 return True
344 else:
345 print(f"์ ์ก ์คํจ: {response.status_code} - {response.text}")
346 return False
347
348 except requests.exceptions.RequestException as e:
349 print(f"๋คํธ์ํฌ ์ค๋ฅ: {e}")
350 return False
351
352
353def http_client_with_retry(api_base: str, sensor_id: str, retries: int = 3):
354 """
355 ์ฌ์๋ ๋ก์ง์ด ์๋ HTTP ํด๋ผ์ด์ธํธ
356
357 Args:
358 api_base: API ๋ฒ ์ด์ค URL
359 sensor_id: ์ผ์ ID
360 retries: ์ฌ์๋ ํ์
361 """
362 print(f"=== HTTP ํด๋ผ์ด์ธํธ (์ฌ์๋: {retries}ํ) ===")
363 print(f"API: {api_base}")
364 print(f"์ผ์ ID: {sensor_id}\n")
365
366 for i in range(10):
367 data = {
368 "temperature": round(random.uniform(20, 30), 1),
369 "humidity": round(random.uniform(40, 70), 1),
370 "timestamp": int(time.time())
371 }
372
373 success = False
374 for attempt in range(retries):
375 if send_sensor_data_http(api_base, sensor_id, data):
376 success = True
377 break
378 else:
379 if attempt < retries - 1:
380 wait_time = 2 ** attempt # Exponential backoff
381 print(f" ์ฌ์๋ ๋๊ธฐ: {wait_time}์ด...")
382 time.sleep(wait_time)
383
384 if not success:
385 print(f" โ {retries}๋ฒ ์ฌ์๋ ํ ์คํจ")
386
387 time.sleep(5)
388
389
390# ============================================================================
391# ๋คํธ์ํฌ ์ฐ๊ฒฐ ์ํ ํ์ธ
392# ============================================================================
393
394def check_internet_connection(host: str = "8.8.8.8", port: int = 53, timeout: int = 3) -> bool:
395 """
396 ์ธํฐ๋ท ์ฐ๊ฒฐ ํ์ธ
397
398 Args:
399 host: ํ
์คํธํ ํธ์คํธ (๊ธฐ๋ณธ: Google DNS)
400 port: ํฌํธ
401 timeout: ํ์์์ (์ด)
402
403 Returns:
404 bool: ์ฐ๊ฒฐ ๊ฐ๋ฅ ์ฌ๋ถ
405 """
406 try:
407 socket.setdefaulttimeout(timeout)
408 socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port))
409 return True
410 except socket.error:
411 return False
412
413
414def get_local_ip() -> str:
415 """
416 ๋ก์ปฌ IP ์ฃผ์ ์กฐํ
417
418 Returns:
419 str: IP ์ฃผ์
420 """
421 try:
422 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
423 s.connect(('8.8.8.8', 80))
424 ip = s.getsockname()[0]
425 s.close()
426 return ip
427 except Exception:
428 return "127.0.0.1"
429
430
431def network_status():
432 """๋คํธ์ํฌ ์ํ ํ์ธ"""
433 print("=== ๋คํธ์ํฌ ์ํ ===\n")
434
435 # ๋ก์ปฌ IP
436 local_ip = get_local_ip()
437 print(f"๋ก์ปฌ IP: {local_ip}")
438
439 # ์ธํฐ๋ท ์ฐ๊ฒฐ
440 internet = check_internet_connection()
441 print(f"์ธํฐ๋ท ์ฐ๊ฒฐ: {'โ ์ฐ๊ฒฐ๋จ' if internet else 'โ ์ฐ๊ฒฐ ์๋จ'}")
442
443 # WiFi ์ ๋ณด
444 wifi_info = get_wifi_info()
445 if 'error' not in wifi_info:
446 print(f"\nWiFi SSID: {wifi_info.get('ssid', 'N/A')}")
447 print(f"์ ํธ ๊ฐ๋: {wifi_info.get('signal_dbm', 'N/A')} dBm")
448 print(f"MAC ์ฃผ์: {wifi_info.get('mac_address', 'N/A')}")
449
450
451# ============================================================================
452# ๋ฉ์ธ ํจ์
453# ============================================================================
454
455def main():
456 """๋ฉ์ธ ํจ์"""
457 if len(sys.argv) < 2:
458 print("WiFi ๋คํธ์ํฌ ์ค์บ๋ ๋ฐ ์์ผ ํ๋ก๊ทธ๋๋ฐ ์์ ")
459 print("\n์ฌ์ฉ๋ฒ:")
460 print(" python wifi_scanner.py info - WiFi ์ ๋ณด ์กฐํ")
461 print(" python wifi_scanner.py scan - WiFi ๋คํธ์ํฌ ์ค์บ")
462 print(" python wifi_scanner.py monitor - WiFi ์ ํธ ๋ชจ๋ํฐ๋ง")
463 print(" python wifi_scanner.py status - ๋คํธ์ํฌ ์ํ ํ์ธ")
464 print(" python wifi_scanner.py server - TCP ์๋ฒ ์์")
465 print(" python wifi_scanner.py client <IP> - TCP ํด๋ผ์ด์ธํธ ์์")
466 print("\n์์ :")
467 print(" python wifi_scanner.py info")
468 print(" python wifi_scanner.py client 192.168.1.100")
469 return
470
471 command = sys.argv[1].lower()
472
473 if command == 'info':
474 print("=== WiFi ์ฐ๊ฒฐ ์ ๋ณด ===")
475 info = get_wifi_info()
476 for key, value in info.items():
477 print(f" {key}: {value}")
478
479 elif command == 'scan':
480 print("=== ์ฃผ๋ณ WiFi ๋คํธ์ํฌ ===")
481 networks = get_wifi_networks()
482 print(f"๋ฐ๊ฒฌ๋ ๋คํธ์ํฌ: {len(networks)}๊ฐ\n")
483 for net in networks:
484 ssid = net.get('ssid', 'Unknown')
485 signal = net.get('signal_dbm', 'N/A')
486 quality = net.get('quality', 'N/A')
487 print(f" {ssid:20} - ์ ํธ: {signal:4} dBm, ํ์ง: {quality}")
488
489 elif command == 'monitor':
490 monitor_wifi_signal(threshold_dbm=-70, interval=5, duration=30)
491
492 elif command == 'status':
493 network_status()
494
495 elif command == 'server':
496 tcp_server()
497
498 elif command == 'client':
499 if len(sys.argv) < 3:
500 print("์ค๋ฅ: ์๋ฒ IP ์ฃผ์๋ฅผ ์
๋ ฅํ์ธ์")
501 print("์์ : python wifi_scanner.py client 192.168.1.100")
502 return
503 server_ip = sys.argv[2]
504 tcp_client(server_ip, count=10)
505
506 else:
507 print(f"์ ์ ์๋ ๋ช
๋ น: {command}")
508 print("'python wifi_scanner.py'๋ฅผ ์คํํ์ฌ ๋์๋ง์ ํ์ธํ์ธ์")
509
510
511if __name__ == "__main__":
512 main()