05. BLE ์—ฐ๊ฒฐ

05. BLE ์—ฐ๊ฒฐ

ํ•™์Šต ๋ชฉํ‘œ

  • BLE(Bluetooth Low Energy) ํ”„๋กœํ† ์ฝœ ๊ฐœ์š” ์ดํ•ด
  • GATT ๊ตฌ์กฐ (์„œ๋น„์Šค, ํŠน์„ฑ) ํŒŒ์•…
  • Python bleak ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ์‚ฌ์šฉ๋ฒ• ์Šต๋“
  • BLE ์žฅ์น˜ ์Šค์บ” ๋ฐ ์—ฐ๊ฒฐ
  • ์„ผ์„œ ๋ฐ์ดํ„ฐ ์ˆ˜์‹ 

1. BLE ํ”„๋กœํ† ์ฝœ ๊ฐœ์š”

1.1 BLE vs ํด๋ž˜์‹ Bluetooth

ํŠน์„ฑ BLE (Bluetooth Low Energy) ํด๋ž˜์‹ Bluetooth
์ „๋ ฅ ์†Œ๋น„ ๋งค์šฐ ๋‚ฎ์Œ ๋†’์Œ
๋ฐ์ดํ„ฐ ์ „์†ก๋ฅ  1-2 Mbps 1-3 Mbps
๋ฒ”์œ„ ~100m ~100m
์ง€์—ฐ ์‹œ๊ฐ„ ~6ms ~100ms
ํŽ˜์–ด๋ง ๊ฐ„๋‹จ/์ž๋™ ๋ณต์žก
์šฉ๋„ IoT ์„ผ์„œ, ์›จ์–ด๋Ÿฌ๋ธ” ์˜ค๋””์˜ค, ํŒŒ์ผ ์ „์†ก

1.2 BLE ํ”„๋กœํ† ์ฝœ ์Šคํƒ

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    BLE ํ”„๋กœํ† ์ฝœ ์Šคํƒ                         โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                              โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚
โ”‚  โ”‚                    Application                       โ”‚    โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚
โ”‚                           โ”‚                                  โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚
โ”‚  โ”‚                GAP (Generic Access Profile)          โ”‚    โ”‚
โ”‚  โ”‚           ๋””๋ฐ”์ด์Šค ๊ฒ€์ƒ‰, ์—ฐ๊ฒฐ, ๋ณด์•ˆ                   โ”‚    โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚
โ”‚                           โ”‚                                  โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚
โ”‚  โ”‚             GATT (Generic Attribute Profile)         โ”‚    โ”‚
โ”‚  โ”‚              ์„œ๋น„์Šค, ํŠน์„ฑ, ๋ฐ์ดํ„ฐ ๊ตํ™˜                โ”‚    โ”‚
โ”‚  โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค    โ”‚
โ”‚  โ”‚                 ATT (Attribute Protocol)             โ”‚    โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚
โ”‚                           โ”‚                                  โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚
โ”‚  โ”‚                    L2CAP                             โ”‚    โ”‚
โ”‚  โ”‚           ๋…ผ๋ฆฌ ๋งํฌ ์ œ์–ด ๋ฐ ์ ์‘ ํ”„๋กœํ† ์ฝœ             โ”‚    โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚
โ”‚                           โ”‚                                  โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚
โ”‚  โ”‚              Link Layer + Physical Layer             โ”‚    โ”‚
โ”‚  โ”‚                  ๋ฌด์„  ํ†ต์‹  ์ฒ˜๋ฆฌ                       โ”‚    โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚
โ”‚                                                              โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

1.3 BLE ์—ญํ• 

# BLE ์—ญํ•  ์ •์˜
ble_roles = {
    "Central (Master)": {
        "description": "๋‹ค๋ฅธ ์žฅ์น˜๋ฅผ ์Šค์บ”ํ•˜๊ณ  ์—ฐ๊ฒฐ์„ ์‹œ์ž‘",
        "example": "์Šค๋งˆํŠธํฐ, ๋ผ์ฆˆ๋ฒ ๋ฆฌํŒŒ์ด",
        "behavior": ["์Šค์บ”", "์—ฐ๊ฒฐ ์š”์ฒญ", "๋ฐ์ดํ„ฐ ์š”์ฒญ"]
    },
    "Peripheral (Slave)": {
        "description": "๊ด‘๊ณ ํ•˜๊ณ  ์—ฐ๊ฒฐ์„ ๊ธฐ๋‹ค๋ฆผ",
        "example": "์„ผ์„œ, ๋น„์ฝ˜, ์›จ์–ด๋Ÿฌ๋ธ”",
        "behavior": ["๊ด‘๊ณ ", "์—ฐ๊ฒฐ ๋Œ€๊ธฐ", "๋ฐ์ดํ„ฐ ์ œ๊ณต"]
    },
    "Observer": {
        "description": "๊ด‘๊ณ  ํŒจํ‚ท๋งŒ ์ˆ˜์‹  (์—ฐ๊ฒฐ ์—†์Œ)",
        "example": "๋น„์ฝ˜ ๋ฆฌ๋”",
        "behavior": ["์Šค์บ”๋งŒ"]
    },
    "Broadcaster": {
        "description": "๊ด‘๊ณ  ํŒจํ‚ท๋งŒ ์†ก์‹  (์—ฐ๊ฒฐ ์—†์Œ)",
        "example": "๋น„์ฝ˜",
        "behavior": ["๊ด‘๊ณ ๋งŒ"]
    }
}

2. GATT ๊ตฌ์กฐ

2.1 GATT ๊ณ„์ธต ๊ตฌ์กฐ

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    GATT ๊ณ„์ธต ๊ตฌ์กฐ                            โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                              โ”‚
โ”‚   GATT Server (Peripheral)                                   โ”‚
โ”‚   โ”‚                                                          โ”‚
โ”‚   โ”œโ”€โ”€ Profile                                                โ”‚
โ”‚   โ”‚   โ”‚                                                      โ”‚
โ”‚   โ”‚   โ”œโ”€โ”€ Service (UUID: 0x180F - Battery Service)          โ”‚
โ”‚   โ”‚   โ”‚   โ”‚                                                  โ”‚
โ”‚   โ”‚   โ”‚   โ””โ”€โ”€ Characteristic (UUID: 0x2A19 - Battery Level)โ”‚
โ”‚   โ”‚   โ”‚       โ”œโ”€โ”€ Value: 85 (0-100%)                        โ”‚
โ”‚   โ”‚   โ”‚       โ”œโ”€โ”€ Properties: Read, Notify                  โ”‚
โ”‚   โ”‚   โ”‚       โ””โ”€โ”€ Descriptors                               โ”‚
โ”‚   โ”‚   โ”‚           โ””โ”€โ”€ CCCD (Client Config Descriptor)       โ”‚
โ”‚   โ”‚   โ”‚                                                      โ”‚
โ”‚   โ”‚   โ””โ”€โ”€ Service (UUID: 0x181A - Environmental Sensing)    โ”‚
โ”‚   โ”‚       โ”‚                                                  โ”‚
โ”‚   โ”‚       โ”œโ”€โ”€ Characteristic: Temperature (0x2A6E)          โ”‚
โ”‚   โ”‚       โ”‚   โ””โ”€โ”€ Value: 25.5ยฐC                             โ”‚
โ”‚   โ”‚       โ”‚                                                  โ”‚
โ”‚   โ”‚       โ””โ”€โ”€ Characteristic: Humidity (0x2A6F)             โ”‚
โ”‚   โ”‚           โ””โ”€โ”€ Value: 60%                                 โ”‚
โ”‚   โ”‚                                                          โ”‚
โ”‚   โ””โ”€โ”€ ...                                                    โ”‚
โ”‚                                                              โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

2.2 ํ‘œ์ค€ UUID

# ํ‘œ์ค€ BLE ์„œ๋น„์Šค UUID (16-bit)
standard_services = {
    "0x1800": "Generic Access",
    "0x1801": "Generic Attribute",
    "0x180A": "Device Information",
    "0x180F": "Battery Service",
    "0x181A": "Environmental Sensing",
    "0x180D": "Heart Rate",
}

# ํ‘œ์ค€ ํŠน์„ฑ UUID
standard_characteristics = {
    "0x2A00": "Device Name",
    "0x2A19": "Battery Level",
    "0x2A6E": "Temperature",
    "0x2A6F": "Humidity",
    "0x2A37": "Heart Rate Measurement",
}

# 16-bit UUID๋ฅผ 128-bit๋กœ ๋ณ€ํ™˜
def uuid_16_to_128(uuid_16: str) -> str:
    """16-bit UUID๋ฅผ 128-bit BLE ๊ธฐ๋ณธ UUID๋กœ ๋ณ€ํ™˜"""
    base_uuid = "00000000-0000-1000-8000-00805f9b34fb"
    uuid_16_clean = uuid_16.replace("0x", "").lower()
    return f"0000{uuid_16_clean}{base_uuid[8:]}"

# ์˜ˆ: 0x180F -> 0000180f-0000-1000-8000-00805f9b34fb

2.3 ํŠน์„ฑ ์†์„ฑ

# ํŠน์„ฑ ์†์„ฑ (Properties)
characteristic_properties = {
    "Broadcast": 0x01,       # ๊ด‘๊ณ ์— ํฌํ•จ ๊ฐ€๋Šฅ
    "Read": 0x02,            # ์ฝ๊ธฐ ๊ฐ€๋Šฅ
    "Write No Response": 0x04,  # ์‘๋‹ต ์—†์ด ์“ฐ๊ธฐ
    "Write": 0x08,           # ์‘๋‹ต ์žˆ๋Š” ์“ฐ๊ธฐ
    "Notify": 0x10,          # ์•Œ๋ฆผ (์‘๋‹ต ์—†์Œ)
    "Indicate": 0x20,        # ํ‘œ์‹œ (์‘๋‹ต ์žˆ์Œ)
}

def parse_properties(props: int) -> list:
    """์†์„ฑ ๋น„ํŠธ๋งˆ์Šคํฌ๋ฅผ ๋ฆฌ์ŠคํŠธ๋กœ ๋ณ€ํ™˜"""
    result = []
    for name, value in characteristic_properties.items():
        if props & value:
            result.append(name)
    return result

# ์˜ˆ: parse_properties(0x12) -> ['Read', 'Notify']

3. bleak ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ

3.1 ์„ค์น˜ ๋ฐ ์„ค์ •

# bleak ์„ค์น˜
pip install bleak

# Linux์—์„œ ์ถ”๊ฐ€ ์„ค์ • (bluetoothctl ์ ‘๊ทผ ๊ถŒํ•œ)
sudo usermod -a -G bluetooth $USER

# D-Bus ์„œ๋น„์Šค ํ™•์ธ
sudo systemctl status bluetooth

3.2 BLE ์žฅ์น˜ ์Šค์บ”

#!/usr/bin/env python3
"""BLE ์žฅ์น˜ ์Šค์บ” (bleak)"""

import asyncio
from bleak import BleakScanner

async def scan_devices(timeout: float = 10.0):
    """์ฃผ๋ณ€ BLE ์žฅ์น˜ ์Šค์บ”"""
    print(f"BLE ์žฅ์น˜ ์Šค์บ” ์ค‘... ({timeout}์ดˆ)")

    devices = await BleakScanner.discover(timeout=timeout)

    print(f"\n๋ฐœ๊ฒฌ๋œ ์žฅ์น˜: {len(devices)}๊ฐœ\n")

    for device in devices:
        rssi = device.rssi if hasattr(device, 'rssi') else 'N/A'
        print(f"  ์ด๋ฆ„: {device.name or 'Unknown'}")
        print(f"  ์ฃผ์†Œ: {device.address}")
        print(f"  RSSI: {rssi} dBm")
        print()

    return devices

async def scan_with_filter(name_filter: str = None):
    """์ด๋ฆ„ ํ•„ํ„ฐ๋กœ ์Šค์บ”"""
    devices = await BleakScanner.discover()

    if name_filter:
        devices = [d for d in devices if d.name and name_filter.lower() in d.name.lower()]

    return devices

async def continuous_scan(callback=None, duration: float = 30.0):
    """์—ฐ์† ์Šค์บ” (์žฅ์น˜ ๋ฐœ๊ฒฌ ์‹œ ์ฝœ๋ฐฑ)"""
    def detection_callback(device, advertisement_data):
        print(f"๋ฐœ๊ฒฌ: {device.name} ({device.address})")
        if callback:
            callback(device, advertisement_data)

    scanner = BleakScanner(detection_callback=detection_callback)

    print(f"์—ฐ์† ์Šค์บ” ์‹œ์ž‘ ({duration}์ดˆ)")
    await scanner.start()
    await asyncio.sleep(duration)
    await scanner.stop()

if __name__ == "__main__":
    asyncio.run(scan_devices(10))

3.3 BLE ์žฅ์น˜ ์—ฐ๊ฒฐ

#!/usr/bin/env python3
"""BLE ์žฅ์น˜ ์—ฐ๊ฒฐ ๋ฐ ์„œ๋น„์Šค ํƒ์ƒ‰"""

import asyncio
from bleak import BleakClient, BleakScanner

async def connect_and_explore(address: str):
    """์žฅ์น˜ ์—ฐ๊ฒฐ ํ›„ ์„œ๋น„์Šค/ํŠน์„ฑ ํƒ์ƒ‰"""
    print(f"์—ฐ๊ฒฐ ์ค‘: {address}")

    async with BleakClient(address) as client:
        print(f"์—ฐ๊ฒฐ๋จ! MTU: {client.mtu_size}")

        # ์„œ๋น„์Šค ํƒ์ƒ‰
        for service in client.services:
            print(f"\n์„œ๋น„์Šค: {service.uuid}")
            print(f"  ์„ค๋ช…: {service.description}")

            # ํŠน์„ฑ ํƒ์ƒ‰
            for char in service.characteristics:
                print(f"    ํŠน์„ฑ: {char.uuid}")
                print(f"      ์†์„ฑ: {char.properties}")

                # ์ฝ๊ธฐ ๊ฐ€๋Šฅํ•˜๋ฉด ๊ฐ’ ์ฝ๊ธฐ
                if "read" in char.properties:
                    try:
                        value = await client.read_gatt_char(char.uuid)
                        print(f"      ๊ฐ’: {value}")
                    except Exception as e:
                        print(f"      ์ฝ๊ธฐ ์‹คํŒจ: {e}")

async def find_and_connect(name_filter: str):
    """์ด๋ฆ„์œผ๋กœ ์žฅ์น˜ ์ฐพ์•„ ์—ฐ๊ฒฐ"""
    print(f"์žฅ์น˜ ๊ฒ€์ƒ‰: '{name_filter}'")

    device = await BleakScanner.find_device_by_name(name_filter)

    if device:
        print(f"์žฅ์น˜ ๋ฐœ๊ฒฌ: {device.address}")
        await connect_and_explore(device.address)
    else:
        print("์žฅ์น˜๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.")

if __name__ == "__main__":
    # MAC ์ฃผ์†Œ๋กœ ์ง์ ‘ ์—ฐ๊ฒฐ
    # asyncio.run(connect_and_explore("AA:BB:CC:DD:EE:FF"))

    # ์ด๋ฆ„์œผ๋กœ ๊ฒ€์ƒ‰ ํ›„ ์—ฐ๊ฒฐ
    asyncio.run(find_and_connect("Temperature"))

4. ์„ผ์„œ ๋ฐ์ดํ„ฐ ์ˆ˜์‹ 

4.1 ํŠน์„ฑ ๊ฐ’ ์ฝ๊ธฐ

#!/usr/bin/env python3
"""BLE ํŠน์„ฑ ๊ฐ’ ์ฝ๊ธฐ"""

import asyncio
from bleak import BleakClient
import struct

# ํ‘œ์ค€ UUID
BATTERY_LEVEL_UUID = "00002a19-0000-1000-8000-00805f9b34fb"
TEMPERATURE_UUID = "00002a6e-0000-1000-8000-00805f9b34fb"

async def read_battery_level(address: str) -> int | None:
    """๋ฐฐํ„ฐ๋ฆฌ ๋ ˆ๋ฒจ ์ฝ๊ธฐ"""
    async with BleakClient(address) as client:
        try:
            data = await client.read_gatt_char(BATTERY_LEVEL_UUID)
            # ๋ฐฐํ„ฐ๋ฆฌ ๋ ˆ๋ฒจ์€ 1๋ฐ”์ดํŠธ (0-100%)
            return data[0]
        except Exception as e:
            print(f"์ฝ๊ธฐ ์‹คํŒจ: {e}")
            return None

async def read_temperature(address: str) -> float | None:
    """์˜จ๋„ ์ฝ๊ธฐ (IEEE 11073 ํ˜•์‹)"""
    async with BleakClient(address) as client:
        try:
            data = await client.read_gatt_char(TEMPERATURE_UUID)
            # ์˜จ๋„๋Š” 16-bit ๋ถ€ํ˜ธ์žˆ๋Š” ์ •์ˆ˜ (0.01๋„ ๋‹จ์œ„)
            temp_raw = struct.unpack('<h', data[:2])[0]
            return temp_raw * 0.01
        except Exception as e:
            print(f"์ฝ๊ธฐ ์‹คํŒจ: {e}")
            return None

async def read_multiple_chars(address: str, char_uuids: list) -> dict:
    """์—ฌ๋Ÿฌ ํŠน์„ฑ ํ•œ ๋ฒˆ์— ์ฝ๊ธฐ"""
    results = {}

    async with BleakClient(address) as client:
        for uuid in char_uuids:
            try:
                data = await client.read_gatt_char(uuid)
                results[uuid] = data
            except Exception as e:
                results[uuid] = None
                print(f"UUID {uuid} ์ฝ๊ธฐ ์‹คํŒจ: {e}")

    return results

if __name__ == "__main__":
    address = "AA:BB:CC:DD:EE:FF"

    # ๋ฐฐํ„ฐ๋ฆฌ ๋ ˆ๋ฒจ
    level = asyncio.run(read_battery_level(address))
    if level is not None:
        print(f"๋ฐฐํ„ฐ๋ฆฌ: {level}%")

    # ์˜จ๋„
    temp = asyncio.run(read_temperature(address))
    if temp is not None:
        print(f"์˜จ๋„: {temp}ยฐC")

4.2 ์•Œ๋ฆผ(Notification) ์ˆ˜์‹ 

#!/usr/bin/env python3
"""BLE ์•Œ๋ฆผ ์ˆ˜์‹  (์‹ค์‹œ๊ฐ„ ์„ผ์„œ ๋ฐ์ดํ„ฐ)"""

import asyncio
from bleak import BleakClient
from datetime import datetime

# ์˜ˆ์‹œ UUID (์žฅ์น˜์— ๋”ฐ๋ผ ๋‹ค๋ฆ„)
HEART_RATE_UUID = "00002a37-0000-1000-8000-00805f9b34fb"

def notification_handler(sender, data):
    """์•Œ๋ฆผ ์ˆ˜์‹  ์ฝœ๋ฐฑ"""
    timestamp = datetime.now().strftime("%H:%M:%S")
    print(f"[{timestamp}] ์ˆ˜์‹  ({sender}): {data.hex()}")

    # ๋ฐ์ดํ„ฐ ํŒŒ์‹ฑ (์˜ˆ: Heart Rate Measurement)
    flags = data[0]
    if flags & 0x01:  # 16-bit heart rate
        hr = int.from_bytes(data[1:3], 'little')
    else:  # 8-bit heart rate
        hr = data[1]

    print(f"  ์‹ฌ๋ฐ•์ˆ˜: {hr} bpm")

async def subscribe_notifications(address: str, char_uuid: str, duration: float = 60):
    """์•Œ๋ฆผ ๊ตฌ๋…"""
    async with BleakClient(address) as client:
        print(f"์—ฐ๊ฒฐ๋จ: {address}")

        # ์•Œ๋ฆผ ์‹œ์ž‘
        await client.start_notify(char_uuid, notification_handler)
        print(f"์•Œ๋ฆผ ๊ตฌ๋… ์‹œ์ž‘: {char_uuid}")

        # ์ง€์ •๋œ ์‹œ๊ฐ„ ๋™์•ˆ ์ˆ˜์‹ 
        await asyncio.sleep(duration)

        # ์•Œ๋ฆผ ์ค‘์ง€
        await client.stop_notify(char_uuid)
        print("์•Œ๋ฆผ ๊ตฌ๋… ์ข…๋ฃŒ")

async def subscribe_multiple(address: str, char_uuids: list, duration: float = 60):
    """์—ฌ๋Ÿฌ ํŠน์„ฑ ์•Œ๋ฆผ ๊ตฌ๋…"""
    async with BleakClient(address) as client:
        print(f"์—ฐ๊ฒฐ๋จ: {address}")

        for uuid in char_uuids:
            await client.start_notify(uuid, notification_handler)
            print(f"๊ตฌ๋…: {uuid}")

        await asyncio.sleep(duration)

        for uuid in char_uuids:
            await client.stop_notify(uuid)

if __name__ == "__main__":
    asyncio.run(subscribe_notifications("AA:BB:CC:DD:EE:FF", HEART_RATE_UUID, 30))

4.3 ํŠน์„ฑ ๊ฐ’ ์“ฐ๊ธฐ

#!/usr/bin/env python3
"""BLE ํŠน์„ฑ์— ๊ฐ’ ์“ฐ๊ธฐ"""

import asyncio
from bleak import BleakClient

async def write_characteristic(address: str, char_uuid: str, data: bytes):
    """ํŠน์„ฑ์— ๊ฐ’ ์“ฐ๊ธฐ (์‘๋‹ต ์žˆ์Œ)"""
    async with BleakClient(address) as client:
        await client.write_gatt_char(char_uuid, data, response=True)
        print(f"์“ฐ๊ธฐ ์™„๋ฃŒ: {data.hex()}")

async def write_without_response(address: str, char_uuid: str, data: bytes):
    """ํŠน์„ฑ์— ๊ฐ’ ์“ฐ๊ธฐ (์‘๋‹ต ์—†์Œ - ๋น ๋ฆ„)"""
    async with BleakClient(address) as client:
        await client.write_gatt_char(char_uuid, data, response=False)
        print(f"์“ฐ๊ธฐ ์ „์†ก: {data.hex()}")

async def toggle_led(address: str, led_uuid: str, state: bool):
    """LED ์ œ์–ด ์˜ˆ์ œ"""
    data = bytes([0x01 if state else 0x00])
    await write_characteristic(address, led_uuid, data)

async def set_sensor_interval(address: str, config_uuid: str, interval_ms: int):
    """์„ผ์„œ ์ธก์ • ์ฃผ๊ธฐ ์„ค์ •"""
    # 2๋ฐ”์ดํŠธ ๋ฆฌํ‹€์—”๋””์•ˆ
    data = interval_ms.to_bytes(2, 'little')
    await write_characteristic(address, config_uuid, data)
    print(f"์ธก์ • ์ฃผ๊ธฐ ์„ค์ •: {interval_ms}ms")

if __name__ == "__main__":
    # LED ํ† ๊ธ€
    asyncio.run(toggle_led("AA:BB:CC:DD:EE:FF", "custom-led-uuid", True))

5. ์ข…ํ•ฉ ์˜ˆ์ œ: BLE ์„ผ์„œ ๋ชจ๋‹ˆํ„ฐ

#!/usr/bin/env python3
"""BLE ํ™˜๊ฒฝ ์„ผ์„œ ๋ชจ๋‹ˆํ„ฐ"""

import asyncio
from bleak import BleakClient, BleakScanner
from datetime import datetime
import struct

class BLESensorMonitor:
    """BLE ํ™˜๊ฒฝ ์„ผ์„œ ๋ชจ๋‹ˆํ„ฐ๋ง ํด๋ž˜์Šค"""

    # ํ‘œ์ค€ Environmental Sensing ์„œ๋น„์Šค
    ENV_SENSING_SERVICE = "0000181a-0000-1000-8000-00805f9b34fb"
    TEMPERATURE_CHAR = "00002a6e-0000-1000-8000-00805f9b34fb"
    HUMIDITY_CHAR = "00002a6f-0000-1000-8000-00805f9b34fb"

    def __init__(self, device_address: str = None, device_name: str = None):
        self.device_address = device_address
        self.device_name = device_name
        self.client = None
        self.data_buffer = []

    async def find_device(self) -> str | None:
        """์žฅ์น˜ ๊ฒ€์ƒ‰"""
        if self.device_address:
            return self.device_address

        if self.device_name:
            print(f"์žฅ์น˜ ๊ฒ€์ƒ‰: {self.device_name}")
            device = await BleakScanner.find_device_by_name(self.device_name)
            if device:
                self.device_address = device.address
                return device.address

        return None

    def _handle_temperature(self, sender, data):
        """์˜จ๋„ ๋ฐ์ดํ„ฐ ํ•ธ๋“ค๋Ÿฌ"""
        # 0.01๋„ ๋‹จ์œ„ 16-bit ์ •์ˆ˜
        temp = struct.unpack('<h', data[:2])[0] * 0.01
        timestamp = datetime.now()

        self.data_buffer.append({
            'type': 'temperature',
            'value': temp,
            'unit': 'ยฐC',
            'timestamp': timestamp
        })

        print(f"[{timestamp.strftime('%H:%M:%S')}] ์˜จ๋„: {temp:.2f}ยฐC")

    def _handle_humidity(self, sender, data):
        """์Šต๋„ ๋ฐ์ดํ„ฐ ํ•ธ๋“ค๋Ÿฌ"""
        # 0.01% ๋‹จ์œ„ 16-bit ์ •์ˆ˜
        humidity = struct.unpack('<H', data[:2])[0] * 0.01
        timestamp = datetime.now()

        self.data_buffer.append({
            'type': 'humidity',
            'value': humidity,
            'unit': '%',
            'timestamp': timestamp
        })

        print(f"[{timestamp.strftime('%H:%M:%S')}] ์Šต๋„: {humidity:.2f}%")

    async def start_monitoring(self, duration: float = 60):
        """๋ชจ๋‹ˆํ„ฐ๋ง ์‹œ์ž‘"""
        address = await self.find_device()
        if not address:
            print("์žฅ์น˜๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.")
            return

        print(f"์—ฐ๊ฒฐ ์ค‘: {address}")

        async with BleakClient(address) as client:
            self.client = client
            print("์—ฐ๊ฒฐ๋จ!")

            # ์„œ๋น„์Šค ํ™•์ธ
            services = client.services
            has_env_sensing = any(
                self.ENV_SENSING_SERVICE in str(s.uuid)
                for s in services
            )

            if not has_env_sensing:
                print("Environmental Sensing ์„œ๋น„์Šค๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.")
                print("์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ์„œ๋น„์Šค:")
                for s in services:
                    print(f"  - {s.uuid}")
                return

            # ์•Œ๋ฆผ ๊ตฌ๋…
            try:
                await client.start_notify(self.TEMPERATURE_CHAR, self._handle_temperature)
                print("์˜จ๋„ ์•Œ๋ฆผ ๊ตฌ๋… ์‹œ์ž‘")
            except Exception as e:
                print(f"์˜จ๋„ ๊ตฌ๋… ์‹คํŒจ: {e}")

            try:
                await client.start_notify(self.HUMIDITY_CHAR, self._handle_humidity)
                print("์Šต๋„ ์•Œ๋ฆผ ๊ตฌ๋… ์‹œ์ž‘")
            except Exception as e:
                print(f"์Šต๋„ ๊ตฌ๋… ์‹คํŒจ: {e}")

            print(f"\n๋ชจ๋‹ˆํ„ฐ๋ง ์ค‘... ({duration}์ดˆ)")
            await asyncio.sleep(duration)

            # ์ •๋ฆฌ
            await client.stop_notify(self.TEMPERATURE_CHAR)
            await client.stop_notify(self.HUMIDITY_CHAR)

        print("\n=== ๋ชจ๋‹ˆํ„ฐ๋ง ์ข…๋ฃŒ ===")
        print(f"์ˆ˜์ง‘๋œ ๋ฐ์ดํ„ฐ: {len(self.data_buffer)}๊ฐœ")

    async def read_once(self) -> dict:
        """ํ•œ ๋ฒˆ ์ฝ๊ธฐ"""
        address = await self.find_device()
        if not address:
            return {}

        async with BleakClient(address) as client:
            result = {}

            try:
                data = await client.read_gatt_char(self.TEMPERATURE_CHAR)
                result['temperature'] = struct.unpack('<h', data[:2])[0] * 0.01
            except:
                pass

            try:
                data = await client.read_gatt_char(self.HUMIDITY_CHAR)
                result['humidity'] = struct.unpack('<H', data[:2])[0] * 0.01
            except:
                pass

            return result

    def get_summary(self) -> dict:
        """์ˆ˜์ง‘๋œ ๋ฐ์ดํ„ฐ ์š”์•ฝ"""
        if not self.data_buffer:
            return {}

        temps = [d['value'] for d in self.data_buffer if d['type'] == 'temperature']
        humids = [d['value'] for d in self.data_buffer if d['type'] == 'humidity']

        summary = {}

        if temps:
            summary['temperature'] = {
                'min': min(temps),
                'max': max(temps),
                'avg': sum(temps) / len(temps),
                'count': len(temps)
            }

        if humids:
            summary['humidity'] = {
                'min': min(humids),
                'max': max(humids),
                'avg': sum(humids) / len(humids),
                'count': len(humids)
            }

        return summary

if __name__ == "__main__":
    # ์žฅ์น˜ ์ด๋ฆ„์œผ๋กœ ๊ฒ€์ƒ‰
    monitor = BLESensorMonitor(device_name="EnvSensor")

    # ๋˜๋Š” MAC ์ฃผ์†Œ๋กœ ์ง์ ‘ ์ง€์ •
    # monitor = BLESensorMonitor(device_address="AA:BB:CC:DD:EE:FF")

    try:
        asyncio.run(monitor.start_monitoring(duration=30))
    except KeyboardInterrupt:
        print("\n์‚ฌ์šฉ์ž ์ค‘๋‹จ")

    # ์š”์•ฝ ์ถœ๋ ฅ
    summary = monitor.get_summary()
    if summary:
        print("\n=== ๋ฐ์ดํ„ฐ ์š”์•ฝ ===")
        for sensor, stats in summary.items():
            print(f"{sensor}:")
            print(f"  ์ตœ์†Œ: {stats['min']:.2f}")
            print(f"  ์ตœ๋Œ€: {stats['max']:.2f}")
            print(f"  ํ‰๊ท : {stats['avg']:.2f}")

์—ฐ์Šต ๋ฌธ์ œ

๋ฌธ์ œ 1: BLE ์Šค์บ๋„ˆ

  1. ์ฃผ๋ณ€ BLE ์žฅ์น˜๋ฅผ ์Šค์บ”ํ•˜๋Š” ํ”„๋กœ๊ทธ๋žจ์„ ์ž‘์„ฑํ•˜์„ธ์š”.
  2. RSSI ๊ฐ’ ๊ธฐ์ค€์œผ๋กœ ์ •๋ ฌํ•˜์—ฌ ์ถœ๋ ฅํ•˜์„ธ์š”.

๋ฌธ์ œ 2: ์‹ฌ๋ฐ•์ˆ˜ ๋ชจ๋‹ˆํ„ฐ

  1. ์‹ฌ๋ฐ•์ˆ˜ ์„ผ์„œ(Heart Rate Service: 0x180D)์— ์—ฐ๊ฒฐํ•˜์„ธ์š”.
  2. ์‹ค์‹œ๊ฐ„ ์‹ฌ๋ฐ•์ˆ˜๋ฅผ ์ฝ˜์†”์— ์ถœ๋ ฅํ•˜์„ธ์š”.

๋ฌธ์ œ 3: ๋ฐ์ดํ„ฐ ๋กœ๊น…

  1. BLE ์˜จ๋„ ์„ผ์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์‹ ํ•˜์„ธ์š”.
  2. ๋ฐ์ดํ„ฐ๋ฅผ CSV ํŒŒ์ผ์— ์ €์žฅํ•˜์„ธ์š”.

๋‹ค์Œ ๋‹จ๊ณ„


์ตœ์ข… ์—…๋ฐ์ดํŠธ: 2026-02-01

to navigate between lessons