05. BLE ์ฐ๊ฒฐ
05. BLE ์ฐ๊ฒฐ¶
ํ์ต ๋ชฉํ¶
- BLE(Bluetooth Low Energy) ํ๋กํ ์ฝ ๊ฐ์ ์ดํด
- GATT ๊ตฌ์กฐ (์๋น์ค, ํน์ฑ) ํ์
- Python bleak ๋ผ์ด๋ธ๋ฌ๋ฆฌ ์ฌ์ฉ๋ฒ ์ต๋
- BLE ์ฅ์น ์ค์บ ๋ฐ ์ฐ๊ฒฐ
- ์ผ์ ๋ฐ์ดํฐ ์์
1. BLE ํ๋กํ ์ฝ ๊ฐ์¶
1.1 BLE vs ํด๋์ Bluetooth¶
| ํน์ฑ | BLE (Bluetooth Low Energy) | ํด๋์ Bluetooth |
|---|---|---|
| ์ ๋ ฅ ์๋น | ๋งค์ฐ ๋ฎ์ | ๋์ |
| ๋ฐ์ดํฐ ์ ์ก๋ฅ | 1-2 Mbps | 1-3 Mbps |
| ๋ฒ์ | ~100m | ~100m |
| ์ง์ฐ ์๊ฐ | ~6ms | ~100ms |
| ํ์ด๋ง | ๊ฐ๋จ/์๋ | ๋ณต์ก |
| ์ฉ๋ | IoT ์ผ์, ์จ์ด๋ฌ๋ธ | ์ค๋์ค, ํ์ผ ์ ์ก |
1.2 BLE ํ๋กํ ์ฝ ์คํ¶
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ BLE ํ๋กํ ์ฝ ์คํ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Application โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ GAP (Generic Access Profile) โ โ
โ โ ๋๋ฐ์ด์ค ๊ฒ์, ์ฐ๊ฒฐ, ๋ณด์ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ GATT (Generic Attribute Profile) โ โ
โ โ ์๋น์ค, ํน์ฑ, ๋ฐ์ดํฐ ๊ตํ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค โ
โ โ ATT (Attribute Protocol) โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ L2CAP โ โ
โ โ ๋
ผ๋ฆฌ ๋งํฌ ์ ์ด ๋ฐ ์ ์ ํ๋กํ ์ฝ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Link Layer + Physical Layer โ โ
โ โ ๋ฌด์ ํต์ ์ฒ๋ฆฌ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
1.3 BLE ์ญํ ¶
# BLE ์ญํ ์ ์
ble_roles = {
"Central (Master)": {
"description": "๋ค๋ฅธ ์ฅ์น๋ฅผ ์ค์บํ๊ณ ์ฐ๊ฒฐ์ ์์",
"example": "์ค๋งํธํฐ, ๋ผ์ฆ๋ฒ ๋ฆฌํ์ด",
"behavior": ["์ค์บ", "์ฐ๊ฒฐ ์์ฒญ", "๋ฐ์ดํฐ ์์ฒญ"]
},
"Peripheral (Slave)": {
"description": "๊ด๊ณ ํ๊ณ ์ฐ๊ฒฐ์ ๊ธฐ๋ค๋ฆผ",
"example": "์ผ์, ๋น์ฝ, ์จ์ด๋ฌ๋ธ",
"behavior": ["๊ด๊ณ ", "์ฐ๊ฒฐ ๋๊ธฐ", "๋ฐ์ดํฐ ์ ๊ณต"]
},
"Observer": {
"description": "๊ด๊ณ ํจํท๋ง ์์ (์ฐ๊ฒฐ ์์)",
"example": "๋น์ฝ ๋ฆฌ๋",
"behavior": ["์ค์บ๋ง"]
},
"Broadcaster": {
"description": "๊ด๊ณ ํจํท๋ง ์ก์ (์ฐ๊ฒฐ ์์)",
"example": "๋น์ฝ",
"behavior": ["๊ด๊ณ ๋ง"]
}
}
2. GATT ๊ตฌ์กฐ¶
2.1 GATT ๊ณ์ธต ๊ตฌ์กฐ¶
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ GATT ๊ณ์ธต ๊ตฌ์กฐ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ GATT Server (Peripheral) โ
โ โ โ
โ โโโ Profile โ
โ โ โ โ
โ โ โโโ Service (UUID: 0x180F - Battery Service) โ
โ โ โ โ โ
โ โ โ โโโ Characteristic (UUID: 0x2A19 - Battery Level)โ
โ โ โ โโโ Value: 85 (0-100%) โ
โ โ โ โโโ Properties: Read, Notify โ
โ โ โ โโโ Descriptors โ
โ โ โ โโโ CCCD (Client Config Descriptor) โ
โ โ โ โ
โ โ โโโ Service (UUID: 0x181A - Environmental Sensing) โ
โ โ โ โ
โ โ โโโ Characteristic: Temperature (0x2A6E) โ
โ โ โ โโโ Value: 25.5ยฐC โ
โ โ โ โ
โ โ โโโ Characteristic: Humidity (0x2A6F) โ
โ โ โโโ Value: 60% โ
โ โ โ
โ โโโ ... โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
2.2 ํ์ค UUID¶
# ํ์ค BLE ์๋น์ค UUID (16-bit)
standard_services = {
"0x1800": "Generic Access",
"0x1801": "Generic Attribute",
"0x180A": "Device Information",
"0x180F": "Battery Service",
"0x181A": "Environmental Sensing",
"0x180D": "Heart Rate",
}
# ํ์ค ํน์ฑ UUID
standard_characteristics = {
"0x2A00": "Device Name",
"0x2A19": "Battery Level",
"0x2A6E": "Temperature",
"0x2A6F": "Humidity",
"0x2A37": "Heart Rate Measurement",
}
# 16-bit UUID๋ฅผ 128-bit๋ก ๋ณํ
def uuid_16_to_128(uuid_16: str) -> str:
"""16-bit UUID๋ฅผ 128-bit BLE ๊ธฐ๋ณธ UUID๋ก ๋ณํ"""
base_uuid = "00000000-0000-1000-8000-00805f9b34fb"
uuid_16_clean = uuid_16.replace("0x", "").lower()
return f"0000{uuid_16_clean}{base_uuid[8:]}"
# ์: 0x180F -> 0000180f-0000-1000-8000-00805f9b34fb
2.3 ํน์ฑ ์์ฑ¶
# ํน์ฑ ์์ฑ (Properties)
characteristic_properties = {
"Broadcast": 0x01, # ๊ด๊ณ ์ ํฌํจ ๊ฐ๋ฅ
"Read": 0x02, # ์ฝ๊ธฐ ๊ฐ๋ฅ
"Write No Response": 0x04, # ์๋ต ์์ด ์ฐ๊ธฐ
"Write": 0x08, # ์๋ต ์๋ ์ฐ๊ธฐ
"Notify": 0x10, # ์๋ฆผ (์๋ต ์์)
"Indicate": 0x20, # ํ์ (์๋ต ์์)
}
def parse_properties(props: int) -> list:
"""์์ฑ ๋นํธ๋ง์คํฌ๋ฅผ ๋ฆฌ์คํธ๋ก ๋ณํ"""
result = []
for name, value in characteristic_properties.items():
if props & value:
result.append(name)
return result
# ์: parse_properties(0x12) -> ['Read', 'Notify']
3. bleak ๋ผ์ด๋ธ๋ฌ๋ฆฌ¶
3.1 ์ค์น ๋ฐ ์ค์ ¶
# bleak ์ค์น
pip install bleak
# Linux์์ ์ถ๊ฐ ์ค์ (bluetoothctl ์ ๊ทผ ๊ถํ)
sudo usermod -a -G bluetooth $USER
# D-Bus ์๋น์ค ํ์ธ
sudo systemctl status bluetooth
3.2 BLE ์ฅ์น ์ค์บ¶
#!/usr/bin/env python3
"""BLE ์ฅ์น ์ค์บ (bleak)"""
import asyncio
from bleak import BleakScanner
async def scan_devices(timeout: float = 10.0):
"""์ฃผ๋ณ BLE ์ฅ์น ์ค์บ"""
print(f"BLE ์ฅ์น ์ค์บ ์ค... ({timeout}์ด)")
devices = await BleakScanner.discover(timeout=timeout)
print(f"\n๋ฐ๊ฒฌ๋ ์ฅ์น: {len(devices)}๊ฐ\n")
for device in devices:
rssi = device.rssi if hasattr(device, 'rssi') else 'N/A'
print(f" ์ด๋ฆ: {device.name or 'Unknown'}")
print(f" ์ฃผ์: {device.address}")
print(f" RSSI: {rssi} dBm")
print()
return devices
async def scan_with_filter(name_filter: str = None):
"""์ด๋ฆ ํํฐ๋ก ์ค์บ"""
devices = await BleakScanner.discover()
if name_filter:
devices = [d for d in devices if d.name and name_filter.lower() in d.name.lower()]
return devices
async def continuous_scan(callback=None, duration: float = 30.0):
"""์ฐ์ ์ค์บ (์ฅ์น ๋ฐ๊ฒฌ ์ ์ฝ๋ฐฑ)"""
def detection_callback(device, advertisement_data):
print(f"๋ฐ๊ฒฌ: {device.name} ({device.address})")
if callback:
callback(device, advertisement_data)
scanner = BleakScanner(detection_callback=detection_callback)
print(f"์ฐ์ ์ค์บ ์์ ({duration}์ด)")
await scanner.start()
await asyncio.sleep(duration)
await scanner.stop()
if __name__ == "__main__":
asyncio.run(scan_devices(10))
3.3 BLE ์ฅ์น ์ฐ๊ฒฐ¶
#!/usr/bin/env python3
"""BLE ์ฅ์น ์ฐ๊ฒฐ ๋ฐ ์๋น์ค ํ์"""
import asyncio
from bleak import BleakClient, BleakScanner
async def connect_and_explore(address: str):
"""์ฅ์น ์ฐ๊ฒฐ ํ ์๋น์ค/ํน์ฑ ํ์"""
print(f"์ฐ๊ฒฐ ์ค: {address}")
async with BleakClient(address) as client:
print(f"์ฐ๊ฒฐ๋จ! MTU: {client.mtu_size}")
# ์๋น์ค ํ์
for service in client.services:
print(f"\n์๋น์ค: {service.uuid}")
print(f" ์ค๋ช
: {service.description}")
# ํน์ฑ ํ์
for char in service.characteristics:
print(f" ํน์ฑ: {char.uuid}")
print(f" ์์ฑ: {char.properties}")
# ์ฝ๊ธฐ ๊ฐ๋ฅํ๋ฉด ๊ฐ ์ฝ๊ธฐ
if "read" in char.properties:
try:
value = await client.read_gatt_char(char.uuid)
print(f" ๊ฐ: {value}")
except Exception as e:
print(f" ์ฝ๊ธฐ ์คํจ: {e}")
async def find_and_connect(name_filter: str):
"""์ด๋ฆ์ผ๋ก ์ฅ์น ์ฐพ์ ์ฐ๊ฒฐ"""
print(f"์ฅ์น ๊ฒ์: '{name_filter}'")
device = await BleakScanner.find_device_by_name(name_filter)
if device:
print(f"์ฅ์น ๋ฐ๊ฒฌ: {device.address}")
await connect_and_explore(device.address)
else:
print("์ฅ์น๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค.")
if __name__ == "__main__":
# MAC ์ฃผ์๋ก ์ง์ ์ฐ๊ฒฐ
# asyncio.run(connect_and_explore("AA:BB:CC:DD:EE:FF"))
# ์ด๋ฆ์ผ๋ก ๊ฒ์ ํ ์ฐ๊ฒฐ
asyncio.run(find_and_connect("Temperature"))
4. ์ผ์ ๋ฐ์ดํฐ ์์ ¶
4.1 ํน์ฑ ๊ฐ ์ฝ๊ธฐ¶
#!/usr/bin/env python3
"""BLE ํน์ฑ ๊ฐ ์ฝ๊ธฐ"""
import asyncio
from bleak import BleakClient
import struct
# ํ์ค UUID
BATTERY_LEVEL_UUID = "00002a19-0000-1000-8000-00805f9b34fb"
TEMPERATURE_UUID = "00002a6e-0000-1000-8000-00805f9b34fb"
async def read_battery_level(address: str) -> int | None:
"""๋ฐฐํฐ๋ฆฌ ๋ ๋ฒจ ์ฝ๊ธฐ"""
async with BleakClient(address) as client:
try:
data = await client.read_gatt_char(BATTERY_LEVEL_UUID)
# ๋ฐฐํฐ๋ฆฌ ๋ ๋ฒจ์ 1๋ฐ์ดํธ (0-100%)
return data[0]
except Exception as e:
print(f"์ฝ๊ธฐ ์คํจ: {e}")
return None
async def read_temperature(address: str) -> float | None:
"""์จ๋ ์ฝ๊ธฐ (IEEE 11073 ํ์)"""
async with BleakClient(address) as client:
try:
data = await client.read_gatt_char(TEMPERATURE_UUID)
# ์จ๋๋ 16-bit ๋ถํธ์๋ ์ ์ (0.01๋ ๋จ์)
temp_raw = struct.unpack('<h', data[:2])[0]
return temp_raw * 0.01
except Exception as e:
print(f"์ฝ๊ธฐ ์คํจ: {e}")
return None
async def read_multiple_chars(address: str, char_uuids: list) -> dict:
"""์ฌ๋ฌ ํน์ฑ ํ ๋ฒ์ ์ฝ๊ธฐ"""
results = {}
async with BleakClient(address) as client:
for uuid in char_uuids:
try:
data = await client.read_gatt_char(uuid)
results[uuid] = data
except Exception as e:
results[uuid] = None
print(f"UUID {uuid} ์ฝ๊ธฐ ์คํจ: {e}")
return results
if __name__ == "__main__":
address = "AA:BB:CC:DD:EE:FF"
# ๋ฐฐํฐ๋ฆฌ ๋ ๋ฒจ
level = asyncio.run(read_battery_level(address))
if level is not None:
print(f"๋ฐฐํฐ๋ฆฌ: {level}%")
# ์จ๋
temp = asyncio.run(read_temperature(address))
if temp is not None:
print(f"์จ๋: {temp}ยฐC")
4.2 ์๋ฆผ(Notification) ์์ ¶
#!/usr/bin/env python3
"""BLE ์๋ฆผ ์์ (์ค์๊ฐ ์ผ์ ๋ฐ์ดํฐ)"""
import asyncio
from bleak import BleakClient
from datetime import datetime
# ์์ UUID (์ฅ์น์ ๋ฐ๋ผ ๋ค๋ฆ)
HEART_RATE_UUID = "00002a37-0000-1000-8000-00805f9b34fb"
def notification_handler(sender, data):
"""์๋ฆผ ์์ ์ฝ๋ฐฑ"""
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] ์์ ({sender}): {data.hex()}")
# ๋ฐ์ดํฐ ํ์ฑ (์: Heart Rate Measurement)
flags = data[0]
if flags & 0x01: # 16-bit heart rate
hr = int.from_bytes(data[1:3], 'little')
else: # 8-bit heart rate
hr = data[1]
print(f" ์ฌ๋ฐ์: {hr} bpm")
async def subscribe_notifications(address: str, char_uuid: str, duration: float = 60):
"""์๋ฆผ ๊ตฌ๋
"""
async with BleakClient(address) as client:
print(f"์ฐ๊ฒฐ๋จ: {address}")
# ์๋ฆผ ์์
await client.start_notify(char_uuid, notification_handler)
print(f"์๋ฆผ ๊ตฌ๋
์์: {char_uuid}")
# ์ง์ ๋ ์๊ฐ ๋์ ์์
await asyncio.sleep(duration)
# ์๋ฆผ ์ค์ง
await client.stop_notify(char_uuid)
print("์๋ฆผ ๊ตฌ๋
์ข
๋ฃ")
async def subscribe_multiple(address: str, char_uuids: list, duration: float = 60):
"""์ฌ๋ฌ ํน์ฑ ์๋ฆผ ๊ตฌ๋
"""
async with BleakClient(address) as client:
print(f"์ฐ๊ฒฐ๋จ: {address}")
for uuid in char_uuids:
await client.start_notify(uuid, notification_handler)
print(f"๊ตฌ๋
: {uuid}")
await asyncio.sleep(duration)
for uuid in char_uuids:
await client.stop_notify(uuid)
if __name__ == "__main__":
asyncio.run(subscribe_notifications("AA:BB:CC:DD:EE:FF", HEART_RATE_UUID, 30))
4.3 ํน์ฑ ๊ฐ ์ฐ๊ธฐ¶
#!/usr/bin/env python3
"""BLE ํน์ฑ์ ๊ฐ ์ฐ๊ธฐ"""
import asyncio
from bleak import BleakClient
async def write_characteristic(address: str, char_uuid: str, data: bytes):
"""ํน์ฑ์ ๊ฐ ์ฐ๊ธฐ (์๋ต ์์)"""
async with BleakClient(address) as client:
await client.write_gatt_char(char_uuid, data, response=True)
print(f"์ฐ๊ธฐ ์๋ฃ: {data.hex()}")
async def write_without_response(address: str, char_uuid: str, data: bytes):
"""ํน์ฑ์ ๊ฐ ์ฐ๊ธฐ (์๋ต ์์ - ๋น ๋ฆ)"""
async with BleakClient(address) as client:
await client.write_gatt_char(char_uuid, data, response=False)
print(f"์ฐ๊ธฐ ์ ์ก: {data.hex()}")
async def toggle_led(address: str, led_uuid: str, state: bool):
"""LED ์ ์ด ์์ """
data = bytes([0x01 if state else 0x00])
await write_characteristic(address, led_uuid, data)
async def set_sensor_interval(address: str, config_uuid: str, interval_ms: int):
"""์ผ์ ์ธก์ ์ฃผ๊ธฐ ์ค์ """
# 2๋ฐ์ดํธ ๋ฆฌํ์๋์
data = interval_ms.to_bytes(2, 'little')
await write_characteristic(address, config_uuid, data)
print(f"์ธก์ ์ฃผ๊ธฐ ์ค์ : {interval_ms}ms")
if __name__ == "__main__":
# LED ํ ๊ธ
asyncio.run(toggle_led("AA:BB:CC:DD:EE:FF", "custom-led-uuid", True))
5. ์ข ํฉ ์์ : BLE ์ผ์ ๋ชจ๋ํฐ¶
#!/usr/bin/env python3
"""BLE ํ๊ฒฝ ์ผ์ ๋ชจ๋ํฐ"""
import asyncio
from bleak import BleakClient, BleakScanner
from datetime import datetime
import struct
class BLESensorMonitor:
"""BLE ํ๊ฒฝ ์ผ์ ๋ชจ๋ํฐ๋ง ํด๋์ค"""
# ํ์ค Environmental Sensing ์๋น์ค
ENV_SENSING_SERVICE = "0000181a-0000-1000-8000-00805f9b34fb"
TEMPERATURE_CHAR = "00002a6e-0000-1000-8000-00805f9b34fb"
HUMIDITY_CHAR = "00002a6f-0000-1000-8000-00805f9b34fb"
def __init__(self, device_address: str = None, device_name: str = None):
self.device_address = device_address
self.device_name = device_name
self.client = None
self.data_buffer = []
async def find_device(self) -> str | None:
"""์ฅ์น ๊ฒ์"""
if self.device_address:
return self.device_address
if self.device_name:
print(f"์ฅ์น ๊ฒ์: {self.device_name}")
device = await BleakScanner.find_device_by_name(self.device_name)
if device:
self.device_address = device.address
return device.address
return None
def _handle_temperature(self, sender, data):
"""์จ๋ ๋ฐ์ดํฐ ํธ๋ค๋ฌ"""
# 0.01๋ ๋จ์ 16-bit ์ ์
temp = struct.unpack('<h', data[:2])[0] * 0.01
timestamp = datetime.now()
self.data_buffer.append({
'type': 'temperature',
'value': temp,
'unit': 'ยฐC',
'timestamp': timestamp
})
print(f"[{timestamp.strftime('%H:%M:%S')}] ์จ๋: {temp:.2f}ยฐC")
def _handle_humidity(self, sender, data):
"""์ต๋ ๋ฐ์ดํฐ ํธ๋ค๋ฌ"""
# 0.01% ๋จ์ 16-bit ์ ์
humidity = struct.unpack('<H', data[:2])[0] * 0.01
timestamp = datetime.now()
self.data_buffer.append({
'type': 'humidity',
'value': humidity,
'unit': '%',
'timestamp': timestamp
})
print(f"[{timestamp.strftime('%H:%M:%S')}] ์ต๋: {humidity:.2f}%")
async def start_monitoring(self, duration: float = 60):
"""๋ชจ๋ํฐ๋ง ์์"""
address = await self.find_device()
if not address:
print("์ฅ์น๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค.")
return
print(f"์ฐ๊ฒฐ ์ค: {address}")
async with BleakClient(address) as client:
self.client = client
print("์ฐ๊ฒฐ๋จ!")
# ์๋น์ค ํ์ธ
services = client.services
has_env_sensing = any(
self.ENV_SENSING_SERVICE in str(s.uuid)
for s in services
)
if not has_env_sensing:
print("Environmental Sensing ์๋น์ค๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค.")
print("์ฌ์ฉ ๊ฐ๋ฅํ ์๋น์ค:")
for s in services:
print(f" - {s.uuid}")
return
# ์๋ฆผ ๊ตฌ๋
try:
await client.start_notify(self.TEMPERATURE_CHAR, self._handle_temperature)
print("์จ๋ ์๋ฆผ ๊ตฌ๋
์์")
except Exception as e:
print(f"์จ๋ ๊ตฌ๋
์คํจ: {e}")
try:
await client.start_notify(self.HUMIDITY_CHAR, self._handle_humidity)
print("์ต๋ ์๋ฆผ ๊ตฌ๋
์์")
except Exception as e:
print(f"์ต๋ ๊ตฌ๋
์คํจ: {e}")
print(f"\n๋ชจ๋ํฐ๋ง ์ค... ({duration}์ด)")
await asyncio.sleep(duration)
# ์ ๋ฆฌ
await client.stop_notify(self.TEMPERATURE_CHAR)
await client.stop_notify(self.HUMIDITY_CHAR)
print("\n=== ๋ชจ๋ํฐ๋ง ์ข
๋ฃ ===")
print(f"์์ง๋ ๋ฐ์ดํฐ: {len(self.data_buffer)}๊ฐ")
async def read_once(self) -> dict:
"""ํ ๋ฒ ์ฝ๊ธฐ"""
address = await self.find_device()
if not address:
return {}
async with BleakClient(address) as client:
result = {}
try:
data = await client.read_gatt_char(self.TEMPERATURE_CHAR)
result['temperature'] = struct.unpack('<h', data[:2])[0] * 0.01
except:
pass
try:
data = await client.read_gatt_char(self.HUMIDITY_CHAR)
result['humidity'] = struct.unpack('<H', data[:2])[0] * 0.01
except:
pass
return result
def get_summary(self) -> dict:
"""์์ง๋ ๋ฐ์ดํฐ ์์ฝ"""
if not self.data_buffer:
return {}
temps = [d['value'] for d in self.data_buffer if d['type'] == 'temperature']
humids = [d['value'] for d in self.data_buffer if d['type'] == 'humidity']
summary = {}
if temps:
summary['temperature'] = {
'min': min(temps),
'max': max(temps),
'avg': sum(temps) / len(temps),
'count': len(temps)
}
if humids:
summary['humidity'] = {
'min': min(humids),
'max': max(humids),
'avg': sum(humids) / len(humids),
'count': len(humids)
}
return summary
if __name__ == "__main__":
# ์ฅ์น ์ด๋ฆ์ผ๋ก ๊ฒ์
monitor = BLESensorMonitor(device_name="EnvSensor")
# ๋๋ MAC ์ฃผ์๋ก ์ง์ ์ง์
# monitor = BLESensorMonitor(device_address="AA:BB:CC:DD:EE:FF")
try:
asyncio.run(monitor.start_monitoring(duration=30))
except KeyboardInterrupt:
print("\n์ฌ์ฉ์ ์ค๋จ")
# ์์ฝ ์ถ๋ ฅ
summary = monitor.get_summary()
if summary:
print("\n=== ๋ฐ์ดํฐ ์์ฝ ===")
for sensor, stats in summary.items():
print(f"{sensor}:")
print(f" ์ต์: {stats['min']:.2f}")
print(f" ์ต๋: {stats['max']:.2f}")
print(f" ํ๊ท : {stats['avg']:.2f}")
์ฐ์ต ๋ฌธ์ ¶
๋ฌธ์ 1: BLE ์ค์บ๋¶
- ์ฃผ๋ณ BLE ์ฅ์น๋ฅผ ์ค์บํ๋ ํ๋ก๊ทธ๋จ์ ์์ฑํ์ธ์.
- RSSI ๊ฐ ๊ธฐ์ค์ผ๋ก ์ ๋ ฌํ์ฌ ์ถ๋ ฅํ์ธ์.
๋ฌธ์ 2: ์ฌ๋ฐ์ ๋ชจ๋ํฐ¶
- ์ฌ๋ฐ์ ์ผ์(Heart Rate Service: 0x180D)์ ์ฐ๊ฒฐํ์ธ์.
- ์ค์๊ฐ ์ฌ๋ฐ์๋ฅผ ์ฝ์์ ์ถ๋ ฅํ์ธ์.
๋ฌธ์ 3: ๋ฐ์ดํฐ ๋ก๊น ¶
- BLE ์จ๋ ์ผ์ ๋ฐ์ดํฐ๋ฅผ ์์ ํ์ธ์.
- ๋ฐ์ดํฐ๋ฅผ CSV ํ์ผ์ ์ ์ฅํ์ธ์.
๋ค์ ๋จ๊ณ¶
- 06_MQTT_Protocol.md: BLE ๋ฐ์ดํฐ๋ฅผ MQTT๋ก ์ ์ก
- 10_Home_Automation_Project.md: BLE ์ค๋งํธํ ํ๋ก์ ํธ
์ต์ข ์ ๋ฐ์ดํธ: 2026-02-01