ble_scanner.py

Download
python 637 lines 20.7 KB
  1#!/usr/bin/env python3
  2"""
  3BLE (Bluetooth Low Energy) 장치 스캐너 및 GATT 클라이언트 예제
  4
  5이 스크립트는 다음 기능을 제공합니다:
  61. BLE 장치 스캔 (시뮬레이션 모드 지원)
  72. GATT 서비스 및 특성 탐색
  83. BLE 특성 값 읽기
  94. BLE 알림(Notification) 수신
 105. 센서 데이터 수신 예제
 11
 12참고: content/ko/IoT_Embedded/05_BLE_Connectivity.md
 13
 14주의: 실제 BLE 기능을 사용하려면 bleak 라이브러리가 필요합니다.
 15      pip install bleak
 16"""
 17
 18import asyncio
 19import sys
 20import time
 21import random
 22import struct
 23from typing import Optional, List, Dict, Callable
 24from datetime import datetime
 25
 26
 27# ============================================================================
 28# BLE 라이브러리 임포트 (선택적)
 29# ============================================================================
 30
 31try:
 32    from bleak import BleakScanner, BleakClient
 33    BLEAK_AVAILABLE = True
 34except ImportError:
 35    BLEAK_AVAILABLE = False
 36    print("⚠️  bleak 라이브러리를 찾을 수 없습니다.")
 37    print("   시뮬레이션 모드로 실행됩니다.")
 38    print("   실제 BLE 기능을 사용하려면 'pip install bleak'를 실행하세요.\n")
 39
 40
 41# ============================================================================
 42# 표준 BLE UUID
 43# ============================================================================
 44
 45class BLE_UUID:
 46    """표준 BLE 서비스 및 특성 UUID"""
 47
 48    # 표준 서비스 UUID (16-bit)
 49    GENERIC_ACCESS = "00001800-0000-1000-8000-00805f9b34fb"
 50    GENERIC_ATTRIBUTE = "00001801-0000-1000-8000-00805f9b34fb"
 51    DEVICE_INFORMATION = "0000180a-0000-1000-8000-00805f9b34fb"
 52    BATTERY_SERVICE = "0000180f-0000-1000-8000-00805f9b34fb"
 53    ENVIRONMENTAL_SENSING = "0000181a-0000-1000-8000-00805f9b34fb"
 54    HEART_RATE = "0000180d-0000-1000-8000-00805f9b34fb"
 55
 56    # 표준 특성 UUID
 57    DEVICE_NAME = "00002a00-0000-1000-8000-00805f9b34fb"
 58    BATTERY_LEVEL = "00002a19-0000-1000-8000-00805f9b34fb"
 59    TEMPERATURE = "00002a6e-0000-1000-8000-00805f9b34fb"
 60    HUMIDITY = "00002a6f-0000-1000-8000-00805f9b34fb"
 61    HEART_RATE_MEASUREMENT = "00002a37-0000-1000-8000-00805f9b34fb"
 62
 63    @staticmethod
 64    def uuid_16_to_128(uuid_16: str) -> str:
 65        """
 66        16-bit UUID를 128-bit BLE 기본 UUID로 변환
 67
 68        Args:
 69            uuid_16: 16-bit UUID (예: "0x180F")
 70
 71        Returns:
 72            str: 128-bit UUID
 73        """
 74        base_uuid = "00000000-0000-1000-8000-00805f9b34fb"
 75        uuid_16_clean = uuid_16.replace("0x", "").lower()
 76        return f"0000{uuid_16_clean}{base_uuid[8:]}"
 77
 78
 79# ============================================================================
 80# 시뮬레이션 모드
 81# ============================================================================
 82
 83class SimulatedBLEDevice:
 84    """시뮬레이션용 BLE 장치"""
 85
 86    def __init__(self, name: str, address: str, rssi: int):
 87        self.name = name
 88        self.address = address
 89        self.rssi = rssi
 90
 91    def __repr__(self):
 92        return f"SimulatedBLEDevice(name='{self.name}', address='{self.address}', rssi={self.rssi})"
 93
 94
 95def simulate_ble_scan(timeout: float = 10.0) -> List[SimulatedBLEDevice]:
 96    """
 97    BLE 스캔 시뮬레이션
 98
 99    Args:
100        timeout: 스캔 시간 (초)
101
102    Returns:
103        list: 시뮬레이션된 BLE 장치 리스트
104    """
105    print(f"[시뮬레이션] BLE 장치 스캔 중... ({timeout}초)")
106    time.sleep(2)  # 스캔 시뮬레이션
107
108    devices = [
109        SimulatedBLEDevice("TempSensor-01", "AA:BB:CC:DD:EE:01", -45),
110        SimulatedBLEDevice("HeartRate-BLE", "AA:BB:CC:DD:EE:02", -52),
111        SimulatedBLEDevice("Battery-Monitor", "AA:BB:CC:DD:EE:03", -38),
112        SimulatedBLEDevice("EnvSensor", "AA:BB:CC:DD:EE:04", -61),
113        SimulatedBLEDevice("Smart-Watch", "AA:BB:CC:DD:EE:05", -55),
114        SimulatedBLEDevice(None, "AA:BB:CC:DD:EE:06", -72),  # 이름 없는 장치
115    ]
116
117    return devices
118
119
120def simulate_read_characteristic(char_uuid: str) -> bytes:
121    """
122    BLE 특성 읽기 시뮬레이션
123
124    Args:
125        char_uuid: 특성 UUID
126
127    Returns:
128        bytes: 시뮬레이션 데이터
129    """
130    if BLE_UUID.BATTERY_LEVEL in char_uuid:
131        # 배터리 레벨 (0-100%)
132        return bytes([random.randint(50, 100)])
133
134    elif BLE_UUID.TEMPERATURE in char_uuid:
135        # 온도 (0.01도 단위, 16-bit 정수)
136        temp = random.uniform(20.0, 30.0)
137        temp_raw = int(temp * 100)
138        return struct.pack('<h', temp_raw)
139
140    elif BLE_UUID.HUMIDITY in char_uuid:
141        # 습도 (0.01% 단위, 16-bit 정수)
142        humidity = random.uniform(40.0, 70.0)
143        humidity_raw = int(humidity * 100)
144        return struct.pack('<H', humidity_raw)
145
146    else:
147        # 기본값
148        return b'\x00\x00'
149
150
151# ============================================================================
152# BLE 스캔 함수
153# ============================================================================
154
155async def scan_ble_devices(timeout: float = 10.0, use_simulation: bool = False) -> List:
156    """
157    BLE 장치 스캔
158
159    Args:
160        timeout: 스캔 시간 (초)
161        use_simulation: 강제로 시뮬레이션 모드 사용
162
163    Returns:
164        list: 발견된 BLE 장치 리스트
165    """
166    if not BLEAK_AVAILABLE or use_simulation:
167        return simulate_ble_scan(timeout)
168
169    print(f"BLE 장치 스캔 중... ({timeout}초)")
170
171    try:
172        devices = await BleakScanner.discover(timeout=timeout)
173        return devices
174    except Exception as e:
175        print(f"스캔 오류: {e}")
176        print("시뮬레이션 모드로 전환합니다...")
177        return simulate_ble_scan(timeout)
178
179
180async def scan_with_filter(name_filter: Optional[str] = None, timeout: float = 10.0) -> List:
181    """
182    필터링된 BLE 스캔
183
184    Args:
185        name_filter: 장치 이름 필터 (부분 일치)
186        timeout: 스캔 시간
187
188    Returns:
189        list: 필터링된 장치 리스트
190    """
191    devices = await scan_ble_devices(timeout)
192
193    if name_filter:
194        devices = [d for d in devices if d.name and name_filter.lower() in d.name.lower()]
195
196    return devices
197
198
199async def continuous_scan(duration: float = 30.0, callback: Optional[Callable] = None):
200    """
201    연속 BLE 스캔
202
203    Args:
204        duration: 스캔 기간 (초)
205        callback: 장치 발견 시 호출할 콜백 함수
206    """
207    if not BLEAK_AVAILABLE:
208        print("[시뮬레이션] 연속 스캔은 시뮬레이션 모드에서 지원되지 않습니다.")
209        devices = simulate_ble_scan(duration)
210        for device in devices:
211            print(f"발견: {device.name} ({device.address}) - RSSI: {device.rssi} dBm")
212        return
213
214    def detection_callback(device, advertisement_data):
215        print(f"발견: {device.name or 'Unknown'} ({device.address}) - RSSI: {device.rssi} dBm")
216        if callback:
217            callback(device, advertisement_data)
218
219    scanner = BleakScanner(detection_callback=detection_callback)
220
221    print(f"연속 스캔 시작 ({duration}초)")
222    await scanner.start()
223    await asyncio.sleep(duration)
224    await scanner.stop()
225    print("스캔 종료")
226
227
228# ============================================================================
229# BLE 연결 및 탐색
230# ============================================================================
231
232async def connect_and_explore(address: str, use_simulation: bool = False):
233    """
234    BLE 장치 연결 및 서비스/특성 탐색
235
236    Args:
237        address: BLE 장치 MAC 주소
238        use_simulation: 시뮬레이션 모드 사용
239    """
240    if not BLEAK_AVAILABLE or use_simulation:
241        print(f"[시뮬레이션] 연결 중: {address}")
242        print(f"[시뮬레이션] 연결됨!")
243        print(f"\n서비스: {BLE_UUID.ENVIRONMENTAL_SENSING}")
244        print(f"  설명: Environmental Sensing")
245        print(f"    특성: {BLE_UUID.TEMPERATURE}")
246        print(f"      속성: ['read', 'notify']")
247        print(f"      값: {simulate_read_characteristic(BLE_UUID.TEMPERATURE).hex()}")
248        print(f"    특성: {BLE_UUID.HUMIDITY}")
249        print(f"      속성: ['read', 'notify']")
250        print(f"      값: {simulate_read_characteristic(BLE_UUID.HUMIDITY).hex()}")
251        return
252
253    print(f"연결 중: {address}")
254
255    try:
256        async with BleakClient(address) as client:
257            print(f"연결됨! MTU: {client.mtu_size}")
258
259            # 서비스 탐색
260            for service in client.services:
261                print(f"\n서비스: {service.uuid}")
262                print(f"  설명: {service.description}")
263
264                # 특성 탐색
265                for char in service.characteristics:
266                    print(f"    특성: {char.uuid}")
267                    print(f"      속성: {char.properties}")
268
269                    # 읽기 가능하면 값 읽기
270                    if "read" in char.properties:
271                        try:
272                            value = await client.read_gatt_char(char.uuid)
273                            print(f"      값: {value.hex()}")
274                        except Exception as e:
275                            print(f"      읽기 실패: {e}")
276
277    except Exception as e:
278        print(f"연결 오류: {e}")
279        print("시뮬레이션 모드로 재시도...")
280        await connect_and_explore(address, use_simulation=True)
281
282
283# ============================================================================
284# BLE 센서 데이터 읽기
285# ============================================================================
286
287async def read_sensor_data(address: str, use_simulation: bool = False) -> Dict:
288    """
289    BLE 센서 데이터 읽기
290
291    Args:
292        address: BLE 장치 주소
293        use_simulation: 시뮬레이션 모드
294
295    Returns:
296        dict: 센서 데이터
297    """
298    result = {}
299
300    if not BLEAK_AVAILABLE or use_simulation:
301        print(f"[시뮬레이션] 센서 데이터 읽기: {address}")
302
303        # 배터리
304        battery_data = simulate_read_characteristic(BLE_UUID.BATTERY_LEVEL)
305        result['battery'] = battery_data[0]
306
307        # 온도
308        temp_data = simulate_read_characteristic(BLE_UUID.TEMPERATURE)
309        temp_raw = struct.unpack('<h', temp_data)[0]
310        result['temperature'] = temp_raw * 0.01
311
312        # 습도
313        humidity_data = simulate_read_characteristic(BLE_UUID.HUMIDITY)
314        humidity_raw = struct.unpack('<H', humidity_data)[0]
315        result['humidity'] = humidity_raw * 0.01
316
317        return result
318
319    try:
320        async with BleakClient(address) as client:
321            # 배터리 레벨
322            try:
323                data = await client.read_gatt_char(BLE_UUID.BATTERY_LEVEL)
324                result['battery'] = data[0]
325            except Exception:
326                pass
327
328            # 온도
329            try:
330                data = await client.read_gatt_char(BLE_UUID.TEMPERATURE)
331                temp_raw = struct.unpack('<h', data[:2])[0]
332                result['temperature'] = temp_raw * 0.01
333            except Exception:
334                pass
335
336            # 습도
337            try:
338                data = await client.read_gatt_char(BLE_UUID.HUMIDITY)
339                humidity_raw = struct.unpack('<H', data[:2])[0]
340                result['humidity'] = humidity_raw * 0.01
341            except Exception:
342                pass
343
344    except Exception as e:
345        print(f"읽기 오류: {e}")
346        print("시뮬레이션 모드로 재시도...")
347        return await read_sensor_data(address, use_simulation=True)
348
349    return result
350
351
352# ============================================================================
353# BLE 알림 수신
354# ============================================================================
355
356def create_notification_handler(sensor_type: str):
357    """
358    알림 핸들러 생성
359
360    Args:
361        sensor_type: 센서 타입 ('temperature', 'humidity', etc.)
362
363    Returns:
364        function: 알림 핸들러 함수
365    """
366    def handler(sender, data):
367        timestamp = datetime.now().strftime("%H:%M:%S")
368        print(f"[{timestamp}] 수신 ({sensor_type}): {data.hex()}")
369
370        if sensor_type == 'temperature':
371            temp_raw = struct.unpack('<h', data[:2])[0]
372            temp = temp_raw * 0.01
373            print(f"  온도: {temp:.2f}°C")
374
375        elif sensor_type == 'humidity':
376            humidity_raw = struct.unpack('<H', data[:2])[0]
377            humidity = humidity_raw * 0.01
378            print(f"  습도: {humidity:.2f}%")
379
380        elif sensor_type == 'battery':
381            battery = data[0]
382            print(f"  배터리: {battery}%")
383
384        elif sensor_type == 'heart_rate':
385            flags = data[0]
386            if flags & 0x01:  # 16-bit heart rate
387                hr = int.from_bytes(data[1:3], 'little')
388            else:  # 8-bit heart rate
389                hr = data[1]
390            print(f"  심박수: {hr} bpm")
391
392    return handler
393
394
395async def subscribe_notifications(
396    address: str,
397    char_uuid: str,
398    sensor_type: str = 'unknown',
399    duration: float = 60,
400    use_simulation: bool = False
401):
402    """
403    BLE 알림 구독
404
405    Args:
406        address: BLE 장치 주소
407        char_uuid: 특성 UUID
408        sensor_type: 센서 타입
409        duration: 구독 기간 (초)
410        use_simulation: 시뮬레이션 모드
411    """
412    if not BLEAK_AVAILABLE or use_simulation:
413        print(f"[시뮬레이션] 알림 구독: {address}")
414        print(f"[시뮬레이션] 특성: {char_uuid}")
415        print(f"\n{duration}초 동안 시뮬레이션 데이터 수신...\n")
416
417        handler = create_notification_handler(sensor_type)
418
419        for i in range(int(duration / 2)):
420            # 시뮬레이션 데이터 생성
421            data = simulate_read_characteristic(char_uuid)
422            handler(char_uuid, data)
423            await asyncio.sleep(2)
424
425        print("\n구독 종료")
426        return
427
428    print(f"연결 중: {address}")
429
430    try:
431        async with BleakClient(address) as client:
432            print(f"연결됨!")
433
434            handler = create_notification_handler(sensor_type)
435
436            # 알림 시작
437            await client.start_notify(char_uuid, handler)
438            print(f"알림 구독 시작: {char_uuid}")
439            print(f"{duration}초 동안 수신 중...\n")
440
441            # 지정된 시간 동안 수신
442            await asyncio.sleep(duration)
443
444            # 알림 중지
445            await client.stop_notify(char_uuid)
446            print("\n알림 구독 종료")
447
448    except Exception as e:
449        print(f"구독 오류: {e}")
450        print("시뮬레이션 모드로 재시도...")
451        await subscribe_notifications(address, char_uuid, sensor_type, duration, use_simulation=True)
452
453
454# ============================================================================
455# BLE 센서 모니터 클래스
456# ============================================================================
457
458class BLESensorMonitor:
459    """BLE 환경 센서 모니터링 클래스"""
460
461    def __init__(self, device_address: Optional[str] = None, use_simulation: bool = False):
462        self.device_address = device_address
463        self.use_simulation = use_simulation or not BLEAK_AVAILABLE
464        self.data_buffer = []
465
466    async def start_monitoring(self, duration: float = 60):
467        """
468        모니터링 시작
469
470        Args:
471            duration: 모니터링 기간 (초)
472        """
473        if not self.device_address:
474            print("오류: 장치 주소가 지정되지 않았습니다.")
475            return
476
477        print(f"=== BLE 센서 모니터링 시작 ===")
478        print(f"장치: {self.device_address}")
479        print(f"기간: {duration}초")
480        print(f"모드: {'시뮬레이션' if self.use_simulation else '실제'}\n")
481
482        if self.use_simulation:
483            # 시뮬레이션 모니터링
484            for i in range(int(duration / 5)):
485                data = await read_sensor_data(self.device_address, use_simulation=True)
486                timestamp = datetime.now()
487
488                print(f"[{timestamp.strftime('%H:%M:%S')}] 수신:")
489                for key, value in data.items():
490                    print(f"  {key}: {value}")
491                    self.data_buffer.append({
492                        'type': key,
493                        'value': value,
494                        'timestamp': timestamp
495                    })
496
497                await asyncio.sleep(5)
498        else:
499            # 실제 BLE 모니터링
500            await subscribe_notifications(
501                self.device_address,
502                BLE_UUID.TEMPERATURE,
503                'temperature',
504                duration / 2
505            )
506
507        print("\n=== 모니터링 종료 ===")
508        print(f"수집된 데이터: {len(self.data_buffer)}개")
509
510    def get_summary(self) -> Dict:
511        """수집된 데이터 요약"""
512        if not self.data_buffer:
513            return {}
514
515        summary = {}
516        data_types = set(d['type'] for d in self.data_buffer)
517
518        for dtype in data_types:
519            values = [d['value'] for d in self.data_buffer if d['type'] == dtype]
520            summary[dtype] = {
521                'min': min(values),
522                'max': max(values),
523                'avg': sum(values) / len(values),
524                'count': len(values)
525            }
526
527        return summary
528
529
530# ============================================================================
531# 메인 함수
532# ============================================================================
533
534def print_devices(devices: List):
535    """장치 리스트 출력"""
536    print(f"\n발견된 장치: {len(devices)}\n")
537
538    for i, device in enumerate(devices, 1):
539        name = device.name or 'Unknown'
540        address = device.address
541        rssi = getattr(device, 'rssi', 'N/A')
542        print(f"{i:2}. {name:20} - {address} (RSSI: {rssi} dBm)")
543
544
545async def main_async():
546    """비동기 메인 함수"""
547    if len(sys.argv) < 2:
548        print("BLE 장치 스캐너 및 GATT 클라이언트 예제")
549        print("\n사용법:")
550        print("  python ble_scanner.py scan              - BLE 장치 스캔")
551        print("  python ble_scanner.py scan <필터>       - 이름 필터로 스캔")
552        print("  python ble_scanner.py explore <주소>    - 장치 탐색")
553        print("  python ble_scanner.py read <주소>       - 센서 데이터 읽기")
554        print("  python ble_scanner.py notify <주소>     - 알림 수신")
555        print("  python ble_scanner.py monitor <주소>    - 센서 모니터링")
556        print("\n예제:")
557        print("  python ble_scanner.py scan")
558        print("  python ble_scanner.py scan temp")
559        print("  python ble_scanner.py explore AA:BB:CC:DD:EE:FF")
560        print("\n주의: bleak 라이브러리가 없으면 시뮬레이션 모드로 실행됩니다.")
561        return
562
563    command = sys.argv[1].lower()
564
565    if command == 'scan':
566        name_filter = sys.argv[2] if len(sys.argv) > 2 else None
567        if name_filter:
568            devices = await scan_with_filter(name_filter, timeout=10.0)
569        else:
570            devices = await scan_ble_devices(timeout=10.0)
571        print_devices(devices)
572
573    elif command == 'explore':
574        if len(sys.argv) < 3:
575            print("오류: 장치 주소를 입력하세요")
576            print("예제: python ble_scanner.py explore AA:BB:CC:DD:EE:FF")
577            return
578        address = sys.argv[2]
579        await connect_and_explore(address)
580
581    elif command == 'read':
582        if len(sys.argv) < 3:
583            print("오류: 장치 주소를 입력하세요")
584            return
585        address = sys.argv[2]
586        data = await read_sensor_data(address)
587        print("\n=== 센서 데이터 ===")
588        for key, value in data.items():
589            print(f"  {key}: {value}")
590
591    elif command == 'notify':
592        if len(sys.argv) < 3:
593            print("오류: 장치 주소를 입력하세요")
594            return
595        address = sys.argv[2]
596        await subscribe_notifications(
597            address,
598            BLE_UUID.TEMPERATURE,
599            'temperature',
600            duration=30
601        )
602
603    elif command == 'monitor':
604        if len(sys.argv) < 3:
605            print("오류: 장치 주소를 입력하세요")
606            return
607        address = sys.argv[2]
608        monitor = BLESensorMonitor(address)
609        await monitor.start_monitoring(duration=30)
610
611        # 요약 출력
612        summary = monitor.get_summary()
613        if summary:
614            print("\n=== 데이터 요약 ===")
615            for sensor, stats in summary.items():
616                print(f"{sensor}:")
617                print(f"  최소: {stats['min']:.2f}")
618                print(f"  최대: {stats['max']:.2f}")
619                print(f"  평균: {stats['avg']:.2f}")
620                print(f"  개수: {stats['count']}")
621
622    else:
623        print(f"알 수 없는 명령: {command}")
624        print("'python ble_scanner.py'를 실행하여 도움말을 확인하세요")
625
626
627def main():
628    """메인 함수"""
629    try:
630        asyncio.run(main_async())
631    except KeyboardInterrupt:
632        print("\n\n사용자 중단")
633
634
635if __name__ == "__main__":
636    main()