1#!/usr/bin/env python3
2"""
3BLE (Bluetooth Low Energy) 장치 스캐너 및 GATT 클라이언트 예제
4
5이 스크립트는 다음 기능을 제공합니다:
61. BLE 장치 스캔 (시뮬레이션 모드 지원)
72. GATT 서비스 및 특성 탐색
83. BLE 특성 값 읽기
94. BLE 알림(Notification) 수신
105. 센서 데이터 수신 예제
11
12참고: content/ko/IoT_Embedded/05_BLE_Connectivity.md
13
14주의: 실제 BLE 기능을 사용하려면 bleak 라이브러리가 필요합니다.
15 pip install bleak
16"""
17
18import asyncio
19import sys
20import time
21import random
22import struct
23from typing import Optional, List, Dict, Callable
24from datetime import datetime
25
26
27# ============================================================================
28# BLE 라이브러리 임포트 (선택적)
29# ============================================================================
30
31try:
32 from bleak import BleakScanner, BleakClient
33 BLEAK_AVAILABLE = True
34except ImportError:
35 BLEAK_AVAILABLE = False
36 print("⚠️ bleak 라이브러리를 찾을 수 없습니다.")
37 print(" 시뮬레이션 모드로 실행됩니다.")
38 print(" 실제 BLE 기능을 사용하려면 'pip install bleak'를 실행하세요.\n")
39
40
41# ============================================================================
42# 표준 BLE UUID
43# ============================================================================
44
45class BLE_UUID:
46 """표준 BLE 서비스 및 특성 UUID"""
47
48 # 표준 서비스 UUID (16-bit)
49 GENERIC_ACCESS = "00001800-0000-1000-8000-00805f9b34fb"
50 GENERIC_ATTRIBUTE = "00001801-0000-1000-8000-00805f9b34fb"
51 DEVICE_INFORMATION = "0000180a-0000-1000-8000-00805f9b34fb"
52 BATTERY_SERVICE = "0000180f-0000-1000-8000-00805f9b34fb"
53 ENVIRONMENTAL_SENSING = "0000181a-0000-1000-8000-00805f9b34fb"
54 HEART_RATE = "0000180d-0000-1000-8000-00805f9b34fb"
55
56 # 표준 특성 UUID
57 DEVICE_NAME = "00002a00-0000-1000-8000-00805f9b34fb"
58 BATTERY_LEVEL = "00002a19-0000-1000-8000-00805f9b34fb"
59 TEMPERATURE = "00002a6e-0000-1000-8000-00805f9b34fb"
60 HUMIDITY = "00002a6f-0000-1000-8000-00805f9b34fb"
61 HEART_RATE_MEASUREMENT = "00002a37-0000-1000-8000-00805f9b34fb"
62
63 @staticmethod
64 def uuid_16_to_128(uuid_16: str) -> str:
65 """
66 16-bit UUID를 128-bit BLE 기본 UUID로 변환
67
68 Args:
69 uuid_16: 16-bit UUID (예: "0x180F")
70
71 Returns:
72 str: 128-bit UUID
73 """
74 base_uuid = "00000000-0000-1000-8000-00805f9b34fb"
75 uuid_16_clean = uuid_16.replace("0x", "").lower()
76 return f"0000{uuid_16_clean}{base_uuid[8:]}"
77
78
79# ============================================================================
80# 시뮬레이션 모드
81# ============================================================================
82
83class SimulatedBLEDevice:
84 """시뮬레이션용 BLE 장치"""
85
86 def __init__(self, name: str, address: str, rssi: int):
87 self.name = name
88 self.address = address
89 self.rssi = rssi
90
91 def __repr__(self):
92 return f"SimulatedBLEDevice(name='{self.name}', address='{self.address}', rssi={self.rssi})"
93
94
95def simulate_ble_scan(timeout: float = 10.0) -> List[SimulatedBLEDevice]:
96 """
97 BLE 스캔 시뮬레이션
98
99 Args:
100 timeout: 스캔 시간 (초)
101
102 Returns:
103 list: 시뮬레이션된 BLE 장치 리스트
104 """
105 print(f"[시뮬레이션] BLE 장치 스캔 중... ({timeout}초)")
106 time.sleep(2) # 스캔 시뮬레이션
107
108 devices = [
109 SimulatedBLEDevice("TempSensor-01", "AA:BB:CC:DD:EE:01", -45),
110 SimulatedBLEDevice("HeartRate-BLE", "AA:BB:CC:DD:EE:02", -52),
111 SimulatedBLEDevice("Battery-Monitor", "AA:BB:CC:DD:EE:03", -38),
112 SimulatedBLEDevice("EnvSensor", "AA:BB:CC:DD:EE:04", -61),
113 SimulatedBLEDevice("Smart-Watch", "AA:BB:CC:DD:EE:05", -55),
114 SimulatedBLEDevice(None, "AA:BB:CC:DD:EE:06", -72), # 이름 없는 장치
115 ]
116
117 return devices
118
119
120def simulate_read_characteristic(char_uuid: str) -> bytes:
121 """
122 BLE 특성 읽기 시뮬레이션
123
124 Args:
125 char_uuid: 특성 UUID
126
127 Returns:
128 bytes: 시뮬레이션 데이터
129 """
130 if BLE_UUID.BATTERY_LEVEL in char_uuid:
131 # 배터리 레벨 (0-100%)
132 return bytes([random.randint(50, 100)])
133
134 elif BLE_UUID.TEMPERATURE in char_uuid:
135 # 온도 (0.01도 단위, 16-bit 정수)
136 temp = random.uniform(20.0, 30.0)
137 temp_raw = int(temp * 100)
138 return struct.pack('<h', temp_raw)
139
140 elif BLE_UUID.HUMIDITY in char_uuid:
141 # 습도 (0.01% 단위, 16-bit 정수)
142 humidity = random.uniform(40.0, 70.0)
143 humidity_raw = int(humidity * 100)
144 return struct.pack('<H', humidity_raw)
145
146 else:
147 # 기본값
148 return b'\x00\x00'
149
150
151# ============================================================================
152# BLE 스캔 함수
153# ============================================================================
154
155async def scan_ble_devices(timeout: float = 10.0, use_simulation: bool = False) -> List:
156 """
157 BLE 장치 스캔
158
159 Args:
160 timeout: 스캔 시간 (초)
161 use_simulation: 강제로 시뮬레이션 모드 사용
162
163 Returns:
164 list: 발견된 BLE 장치 리스트
165 """
166 if not BLEAK_AVAILABLE or use_simulation:
167 return simulate_ble_scan(timeout)
168
169 print(f"BLE 장치 스캔 중... ({timeout}초)")
170
171 try:
172 devices = await BleakScanner.discover(timeout=timeout)
173 return devices
174 except Exception as e:
175 print(f"스캔 오류: {e}")
176 print("시뮬레이션 모드로 전환합니다...")
177 return simulate_ble_scan(timeout)
178
179
180async def scan_with_filter(name_filter: Optional[str] = None, timeout: float = 10.0) -> List:
181 """
182 필터링된 BLE 스캔
183
184 Args:
185 name_filter: 장치 이름 필터 (부분 일치)
186 timeout: 스캔 시간
187
188 Returns:
189 list: 필터링된 장치 리스트
190 """
191 devices = await scan_ble_devices(timeout)
192
193 if name_filter:
194 devices = [d for d in devices if d.name and name_filter.lower() in d.name.lower()]
195
196 return devices
197
198
199async def continuous_scan(duration: float = 30.0, callback: Optional[Callable] = None):
200 """
201 연속 BLE 스캔
202
203 Args:
204 duration: 스캔 기간 (초)
205 callback: 장치 발견 시 호출할 콜백 함수
206 """
207 if not BLEAK_AVAILABLE:
208 print("[시뮬레이션] 연속 스캔은 시뮬레이션 모드에서 지원되지 않습니다.")
209 devices = simulate_ble_scan(duration)
210 for device in devices:
211 print(f"발견: {device.name} ({device.address}) - RSSI: {device.rssi} dBm")
212 return
213
214 def detection_callback(device, advertisement_data):
215 print(f"발견: {device.name or 'Unknown'} ({device.address}) - RSSI: {device.rssi} dBm")
216 if callback:
217 callback(device, advertisement_data)
218
219 scanner = BleakScanner(detection_callback=detection_callback)
220
221 print(f"연속 스캔 시작 ({duration}초)")
222 await scanner.start()
223 await asyncio.sleep(duration)
224 await scanner.stop()
225 print("스캔 종료")
226
227
228# ============================================================================
229# BLE 연결 및 탐색
230# ============================================================================
231
232async def connect_and_explore(address: str, use_simulation: bool = False):
233 """
234 BLE 장치 연결 및 서비스/특성 탐색
235
236 Args:
237 address: BLE 장치 MAC 주소
238 use_simulation: 시뮬레이션 모드 사용
239 """
240 if not BLEAK_AVAILABLE or use_simulation:
241 print(f"[시뮬레이션] 연결 중: {address}")
242 print(f"[시뮬레이션] 연결됨!")
243 print(f"\n서비스: {BLE_UUID.ENVIRONMENTAL_SENSING}")
244 print(f" 설명: Environmental Sensing")
245 print(f" 특성: {BLE_UUID.TEMPERATURE}")
246 print(f" 속성: ['read', 'notify']")
247 print(f" 값: {simulate_read_characteristic(BLE_UUID.TEMPERATURE).hex()}")
248 print(f" 특성: {BLE_UUID.HUMIDITY}")
249 print(f" 속성: ['read', 'notify']")
250 print(f" 값: {simulate_read_characteristic(BLE_UUID.HUMIDITY).hex()}")
251 return
252
253 print(f"연결 중: {address}")
254
255 try:
256 async with BleakClient(address) as client:
257 print(f"연결됨! MTU: {client.mtu_size}")
258
259 # 서비스 탐색
260 for service in client.services:
261 print(f"\n서비스: {service.uuid}")
262 print(f" 설명: {service.description}")
263
264 # 특성 탐색
265 for char in service.characteristics:
266 print(f" 특성: {char.uuid}")
267 print(f" 속성: {char.properties}")
268
269 # 읽기 가능하면 값 읽기
270 if "read" in char.properties:
271 try:
272 value = await client.read_gatt_char(char.uuid)
273 print(f" 값: {value.hex()}")
274 except Exception as e:
275 print(f" 읽기 실패: {e}")
276
277 except Exception as e:
278 print(f"연결 오류: {e}")
279 print("시뮬레이션 모드로 재시도...")
280 await connect_and_explore(address, use_simulation=True)
281
282
283# ============================================================================
284# BLE 센서 데이터 읽기
285# ============================================================================
286
287async def read_sensor_data(address: str, use_simulation: bool = False) -> Dict:
288 """
289 BLE 센서 데이터 읽기
290
291 Args:
292 address: BLE 장치 주소
293 use_simulation: 시뮬레이션 모드
294
295 Returns:
296 dict: 센서 데이터
297 """
298 result = {}
299
300 if not BLEAK_AVAILABLE or use_simulation:
301 print(f"[시뮬레이션] 센서 데이터 읽기: {address}")
302
303 # 배터리
304 battery_data = simulate_read_characteristic(BLE_UUID.BATTERY_LEVEL)
305 result['battery'] = battery_data[0]
306
307 # 온도
308 temp_data = simulate_read_characteristic(BLE_UUID.TEMPERATURE)
309 temp_raw = struct.unpack('<h', temp_data)[0]
310 result['temperature'] = temp_raw * 0.01
311
312 # 습도
313 humidity_data = simulate_read_characteristic(BLE_UUID.HUMIDITY)
314 humidity_raw = struct.unpack('<H', humidity_data)[0]
315 result['humidity'] = humidity_raw * 0.01
316
317 return result
318
319 try:
320 async with BleakClient(address) as client:
321 # 배터리 레벨
322 try:
323 data = await client.read_gatt_char(BLE_UUID.BATTERY_LEVEL)
324 result['battery'] = data[0]
325 except Exception:
326 pass
327
328 # 온도
329 try:
330 data = await client.read_gatt_char(BLE_UUID.TEMPERATURE)
331 temp_raw = struct.unpack('<h', data[:2])[0]
332 result['temperature'] = temp_raw * 0.01
333 except Exception:
334 pass
335
336 # 습도
337 try:
338 data = await client.read_gatt_char(BLE_UUID.HUMIDITY)
339 humidity_raw = struct.unpack('<H', data[:2])[0]
340 result['humidity'] = humidity_raw * 0.01
341 except Exception:
342 pass
343
344 except Exception as e:
345 print(f"읽기 오류: {e}")
346 print("시뮬레이션 모드로 재시도...")
347 return await read_sensor_data(address, use_simulation=True)
348
349 return result
350
351
352# ============================================================================
353# BLE 알림 수신
354# ============================================================================
355
356def create_notification_handler(sensor_type: str):
357 """
358 알림 핸들러 생성
359
360 Args:
361 sensor_type: 센서 타입 ('temperature', 'humidity', etc.)
362
363 Returns:
364 function: 알림 핸들러 함수
365 """
366 def handler(sender, data):
367 timestamp = datetime.now().strftime("%H:%M:%S")
368 print(f"[{timestamp}] 수신 ({sensor_type}): {data.hex()}")
369
370 if sensor_type == 'temperature':
371 temp_raw = struct.unpack('<h', data[:2])[0]
372 temp = temp_raw * 0.01
373 print(f" 온도: {temp:.2f}°C")
374
375 elif sensor_type == 'humidity':
376 humidity_raw = struct.unpack('<H', data[:2])[0]
377 humidity = humidity_raw * 0.01
378 print(f" 습도: {humidity:.2f}%")
379
380 elif sensor_type == 'battery':
381 battery = data[0]
382 print(f" 배터리: {battery}%")
383
384 elif sensor_type == 'heart_rate':
385 flags = data[0]
386 if flags & 0x01: # 16-bit heart rate
387 hr = int.from_bytes(data[1:3], 'little')
388 else: # 8-bit heart rate
389 hr = data[1]
390 print(f" 심박수: {hr} bpm")
391
392 return handler
393
394
395async def subscribe_notifications(
396 address: str,
397 char_uuid: str,
398 sensor_type: str = 'unknown',
399 duration: float = 60,
400 use_simulation: bool = False
401):
402 """
403 BLE 알림 구독
404
405 Args:
406 address: BLE 장치 주소
407 char_uuid: 특성 UUID
408 sensor_type: 센서 타입
409 duration: 구독 기간 (초)
410 use_simulation: 시뮬레이션 모드
411 """
412 if not BLEAK_AVAILABLE or use_simulation:
413 print(f"[시뮬레이션] 알림 구독: {address}")
414 print(f"[시뮬레이션] 특성: {char_uuid}")
415 print(f"\n{duration}초 동안 시뮬레이션 데이터 수신...\n")
416
417 handler = create_notification_handler(sensor_type)
418
419 for i in range(int(duration / 2)):
420 # 시뮬레이션 데이터 생성
421 data = simulate_read_characteristic(char_uuid)
422 handler(char_uuid, data)
423 await asyncio.sleep(2)
424
425 print("\n구독 종료")
426 return
427
428 print(f"연결 중: {address}")
429
430 try:
431 async with BleakClient(address) as client:
432 print(f"연결됨!")
433
434 handler = create_notification_handler(sensor_type)
435
436 # 알림 시작
437 await client.start_notify(char_uuid, handler)
438 print(f"알림 구독 시작: {char_uuid}")
439 print(f"{duration}초 동안 수신 중...\n")
440
441 # 지정된 시간 동안 수신
442 await asyncio.sleep(duration)
443
444 # 알림 중지
445 await client.stop_notify(char_uuid)
446 print("\n알림 구독 종료")
447
448 except Exception as e:
449 print(f"구독 오류: {e}")
450 print("시뮬레이션 모드로 재시도...")
451 await subscribe_notifications(address, char_uuid, sensor_type, duration, use_simulation=True)
452
453
454# ============================================================================
455# BLE 센서 모니터 클래스
456# ============================================================================
457
458class BLESensorMonitor:
459 """BLE 환경 센서 모니터링 클래스"""
460
461 def __init__(self, device_address: Optional[str] = None, use_simulation: bool = False):
462 self.device_address = device_address
463 self.use_simulation = use_simulation or not BLEAK_AVAILABLE
464 self.data_buffer = []
465
466 async def start_monitoring(self, duration: float = 60):
467 """
468 모니터링 시작
469
470 Args:
471 duration: 모니터링 기간 (초)
472 """
473 if not self.device_address:
474 print("오류: 장치 주소가 지정되지 않았습니다.")
475 return
476
477 print(f"=== BLE 센서 모니터링 시작 ===")
478 print(f"장치: {self.device_address}")
479 print(f"기간: {duration}초")
480 print(f"모드: {'시뮬레이션' if self.use_simulation else '실제'}\n")
481
482 if self.use_simulation:
483 # 시뮬레이션 모니터링
484 for i in range(int(duration / 5)):
485 data = await read_sensor_data(self.device_address, use_simulation=True)
486 timestamp = datetime.now()
487
488 print(f"[{timestamp.strftime('%H:%M:%S')}] 수신:")
489 for key, value in data.items():
490 print(f" {key}: {value}")
491 self.data_buffer.append({
492 'type': key,
493 'value': value,
494 'timestamp': timestamp
495 })
496
497 await asyncio.sleep(5)
498 else:
499 # 실제 BLE 모니터링
500 await subscribe_notifications(
501 self.device_address,
502 BLE_UUID.TEMPERATURE,
503 'temperature',
504 duration / 2
505 )
506
507 print("\n=== 모니터링 종료 ===")
508 print(f"수집된 데이터: {len(self.data_buffer)}개")
509
510 def get_summary(self) -> Dict:
511 """수집된 데이터 요약"""
512 if not self.data_buffer:
513 return {}
514
515 summary = {}
516 data_types = set(d['type'] for d in self.data_buffer)
517
518 for dtype in data_types:
519 values = [d['value'] for d in self.data_buffer if d['type'] == dtype]
520 summary[dtype] = {
521 'min': min(values),
522 'max': max(values),
523 'avg': sum(values) / len(values),
524 'count': len(values)
525 }
526
527 return summary
528
529
530# ============================================================================
531# 메인 함수
532# ============================================================================
533
534def print_devices(devices: List):
535 """장치 리스트 출력"""
536 print(f"\n발견된 장치: {len(devices)}개\n")
537
538 for i, device in enumerate(devices, 1):
539 name = device.name or 'Unknown'
540 address = device.address
541 rssi = getattr(device, 'rssi', 'N/A')
542 print(f"{i:2}. {name:20} - {address} (RSSI: {rssi} dBm)")
543
544
545async def main_async():
546 """비동기 메인 함수"""
547 if len(sys.argv) < 2:
548 print("BLE 장치 스캐너 및 GATT 클라이언트 예제")
549 print("\n사용법:")
550 print(" python ble_scanner.py scan - BLE 장치 스캔")
551 print(" python ble_scanner.py scan <필터> - 이름 필터로 스캔")
552 print(" python ble_scanner.py explore <주소> - 장치 탐색")
553 print(" python ble_scanner.py read <주소> - 센서 데이터 읽기")
554 print(" python ble_scanner.py notify <주소> - 알림 수신")
555 print(" python ble_scanner.py monitor <주소> - 센서 모니터링")
556 print("\n예제:")
557 print(" python ble_scanner.py scan")
558 print(" python ble_scanner.py scan temp")
559 print(" python ble_scanner.py explore AA:BB:CC:DD:EE:FF")
560 print("\n주의: bleak 라이브러리가 없으면 시뮬레이션 모드로 실행됩니다.")
561 return
562
563 command = sys.argv[1].lower()
564
565 if command == 'scan':
566 name_filter = sys.argv[2] if len(sys.argv) > 2 else None
567 if name_filter:
568 devices = await scan_with_filter(name_filter, timeout=10.0)
569 else:
570 devices = await scan_ble_devices(timeout=10.0)
571 print_devices(devices)
572
573 elif command == 'explore':
574 if len(sys.argv) < 3:
575 print("오류: 장치 주소를 입력하세요")
576 print("예제: python ble_scanner.py explore AA:BB:CC:DD:EE:FF")
577 return
578 address = sys.argv[2]
579 await connect_and_explore(address)
580
581 elif command == 'read':
582 if len(sys.argv) < 3:
583 print("오류: 장치 주소를 입력하세요")
584 return
585 address = sys.argv[2]
586 data = await read_sensor_data(address)
587 print("\n=== 센서 데이터 ===")
588 for key, value in data.items():
589 print(f" {key}: {value}")
590
591 elif command == 'notify':
592 if len(sys.argv) < 3:
593 print("오류: 장치 주소를 입력하세요")
594 return
595 address = sys.argv[2]
596 await subscribe_notifications(
597 address,
598 BLE_UUID.TEMPERATURE,
599 'temperature',
600 duration=30
601 )
602
603 elif command == 'monitor':
604 if len(sys.argv) < 3:
605 print("오류: 장치 주소를 입력하세요")
606 return
607 address = sys.argv[2]
608 monitor = BLESensorMonitor(address)
609 await monitor.start_monitoring(duration=30)
610
611 # 요약 출력
612 summary = monitor.get_summary()
613 if summary:
614 print("\n=== 데이터 요약 ===")
615 for sensor, stats in summary.items():
616 print(f"{sensor}:")
617 print(f" 최소: {stats['min']:.2f}")
618 print(f" 최대: {stats['max']:.2f}")
619 print(f" 평균: {stats['avg']:.2f}")
620 print(f" 개수: {stats['count']}")
621
622 else:
623 print(f"알 수 없는 명령: {command}")
624 print("'python ble_scanner.py'를 실행하여 도움말을 확인하세요")
625
626
627def main():
628 """메인 함수"""
629 try:
630 asyncio.run(main_async())
631 except KeyboardInterrupt:
632 print("\n\n사용자 중단")
633
634
635if __name__ == "__main__":
636 main()