1#!/usr/bin/env python3
2"""
3MQTT Subscriber Example
4MQTT 구독자 예제
5
6Subscribes to MQTT topics and receives sensor data.
7
8Install:
9 pip install paho-mqtt
10
11Usage:
12 python3 mqtt_subscriber.py
13 python3 mqtt_subscriber.py --broker 192.168.1.100 --topic "sensor/#"
14"""
15
16import paho.mqtt.client as mqtt
17import json
18import argparse
19from datetime import datetime
20
21# Default configuration
22DEFAULT_BROKER = "localhost"
23DEFAULT_PORT = 1883
24DEFAULT_TOPICS = ["sensor/#", "device/+/status"]
25
26class MQTTSubscriber:
27 """MQTT Message Subscriber"""
28
29 def __init__(self, broker: str, port: int, topics: list):
30 self.broker = broker
31 self.port = port
32 self.topics = topics
33
34 # Create MQTT client
35 self.client = mqtt.Client(client_id="subscriber_monitor")
36 self.client.on_connect = self._on_connect
37 self.client.on_message = self._on_message
38 self.client.on_disconnect = self._on_disconnect
39
40 self.connected = False
41 self.message_count = 0
42 self.message_handlers = {}
43
44 def _on_connect(self, client, userdata, flags, rc):
45 """Connection callback"""
46 if rc == 0:
47 print(f"Connected to MQTT broker: {self.broker}:{self.port}")
48 self.connected = True
49
50 # Subscribe to topics
51 for topic in self.topics:
52 client.subscribe(topic, qos=1)
53 print(f"Subscribed to: {topic}")
54 else:
55 print(f"Connection failed with code: {rc}")
56
57 def _on_message(self, client, userdata, msg):
58 """Message received callback"""
59 self.message_count += 1
60 timestamp = datetime.now().strftime("%H:%M:%S")
61
62 try:
63 # Try to parse as JSON
64 payload = json.loads(msg.payload.decode('utf-8'))
65 self._display_json_message(timestamp, msg.topic, payload)
66 except json.JSONDecodeError:
67 # Display as plain text
68 payload = msg.payload.decode('utf-8')
69 self._display_text_message(timestamp, msg.topic, payload)
70
71 # Call custom handler if registered
72 for pattern, handler in self.message_handlers.items():
73 if mqtt.topic_matches_sub(pattern, msg.topic):
74 handler(msg.topic, payload)
75
76 def _display_json_message(self, timestamp: str, topic: str, payload: dict):
77 """Display formatted JSON message"""
78 print(f"\n[{timestamp}] #{self.message_count}")
79 print(f"Topic: {topic}")
80 print(f"Data:")
81 for key, value in payload.items():
82 print(f" {key}: {value}")
83 print("-" * 50)
84
85 def _display_text_message(self, timestamp: str, topic: str, payload: str):
86 """Display plain text message"""
87 print(f"\n[{timestamp}] #{self.message_count}")
88 print(f"Topic: {topic}")
89 print(f"Message: {payload}")
90 print("-" * 50)
91
92 def _on_disconnect(self, client, userdata, rc):
93 """Disconnection callback"""
94 print(f"Disconnected from broker (rc={rc})")
95 self.connected = False
96
97 def add_handler(self, topic_pattern: str, handler):
98 """Add custom message handler for topic pattern"""
99 self.message_handlers[topic_pattern] = handler
100
101 def connect(self):
102 """Connect to broker"""
103 print(f"Connecting to {self.broker}:{self.port}...")
104 self.client.connect(self.broker, self.port, keepalive=60)
105
106 def run(self):
107 """Run subscriber loop"""
108 self.connect()
109 print("\nWaiting for messages... (Ctrl+C to stop)")
110 print("=" * 50)
111
112 try:
113 self.client.loop_forever()
114 except KeyboardInterrupt:
115 print("\nStopped by user")
116 finally:
117 self.client.disconnect()
118 print(f"Total messages received: {self.message_count}")
119
120# Example custom handlers
121def temperature_alert_handler(topic: str, payload: dict):
122 """Alert when temperature exceeds threshold"""
123 if isinstance(payload, dict):
124 temp = payload.get("temperature")
125 if temp and temp > 30:
126 print(f" [ALERT] High temperature: {temp}°C")
127
128def motion_handler(topic: str, payload: dict):
129 """Handle motion detection events"""
130 if isinstance(payload, dict):
131 if payload.get("motion_detected"):
132 print(f" [MOTION] Motion detected at {topic}")
133
134def main():
135 """Main function"""
136 parser = argparse.ArgumentParser(description="MQTT Subscriber")
137 parser.add_argument("--broker", default=DEFAULT_BROKER, help="MQTT broker address")
138 parser.add_argument("--port", type=int, default=DEFAULT_PORT, help="MQTT broker port")
139 parser.add_argument("--topic", nargs="+", default=DEFAULT_TOPICS, help="Topics to subscribe")
140 args = parser.parse_args()
141
142 print("=== MQTT Subscriber ===")
143 print(f"Broker: {args.broker}:{args.port}")
144 print(f"Topics: {', '.join(args.topic)}")
145 print()
146
147 subscriber = MQTTSubscriber(args.broker, args.port, args.topic)
148
149 # Add custom handlers
150 subscriber.add_handler("sensor/+/temperature", temperature_alert_handler)
151 subscriber.add_handler("sensor/+/motion", motion_handler)
152
153 subscriber.run()
154
155if __name__ == "__main__":
156 main()