mqtt_subscriber.py

Download
python 157 lines 5.1 KB
  1#!/usr/bin/env python3
  2"""
  3MQTT Subscriber Example
  4MQTT 구독자 예제
  5
  6Subscribes to MQTT topics and receives sensor data.
  7
  8Install:
  9    pip install paho-mqtt
 10
 11Usage:
 12    python3 mqtt_subscriber.py
 13    python3 mqtt_subscriber.py --broker 192.168.1.100 --topic "sensor/#"
 14"""
 15
 16import paho.mqtt.client as mqtt
 17import json
 18import argparse
 19from datetime import datetime
 20
 21# Default configuration
 22DEFAULT_BROKER = "localhost"
 23DEFAULT_PORT = 1883
 24DEFAULT_TOPICS = ["sensor/#", "device/+/status"]
 25
 26class MQTTSubscriber:
 27    """MQTT Message Subscriber"""
 28
 29    def __init__(self, broker: str, port: int, topics: list):
 30        self.broker = broker
 31        self.port = port
 32        self.topics = topics
 33
 34        # Create MQTT client
 35        self.client = mqtt.Client(client_id="subscriber_monitor")
 36        self.client.on_connect = self._on_connect
 37        self.client.on_message = self._on_message
 38        self.client.on_disconnect = self._on_disconnect
 39
 40        self.connected = False
 41        self.message_count = 0
 42        self.message_handlers = {}
 43
 44    def _on_connect(self, client, userdata, flags, rc):
 45        """Connection callback"""
 46        if rc == 0:
 47            print(f"Connected to MQTT broker: {self.broker}:{self.port}")
 48            self.connected = True
 49
 50            # Subscribe to topics
 51            for topic in self.topics:
 52                client.subscribe(topic, qos=1)
 53                print(f"Subscribed to: {topic}")
 54        else:
 55            print(f"Connection failed with code: {rc}")
 56
 57    def _on_message(self, client, userdata, msg):
 58        """Message received callback"""
 59        self.message_count += 1
 60        timestamp = datetime.now().strftime("%H:%M:%S")
 61
 62        try:
 63            # Try to parse as JSON
 64            payload = json.loads(msg.payload.decode('utf-8'))
 65            self._display_json_message(timestamp, msg.topic, payload)
 66        except json.JSONDecodeError:
 67            # Display as plain text
 68            payload = msg.payload.decode('utf-8')
 69            self._display_text_message(timestamp, msg.topic, payload)
 70
 71        # Call custom handler if registered
 72        for pattern, handler in self.message_handlers.items():
 73            if mqtt.topic_matches_sub(pattern, msg.topic):
 74                handler(msg.topic, payload)
 75
 76    def _display_json_message(self, timestamp: str, topic: str, payload: dict):
 77        """Display formatted JSON message"""
 78        print(f"\n[{timestamp}] #{self.message_count}")
 79        print(f"Topic: {topic}")
 80        print(f"Data:")
 81        for key, value in payload.items():
 82            print(f"  {key}: {value}")
 83        print("-" * 50)
 84
 85    def _display_text_message(self, timestamp: str, topic: str, payload: str):
 86        """Display plain text message"""
 87        print(f"\n[{timestamp}] #{self.message_count}")
 88        print(f"Topic: {topic}")
 89        print(f"Message: {payload}")
 90        print("-" * 50)
 91
 92    def _on_disconnect(self, client, userdata, rc):
 93        """Disconnection callback"""
 94        print(f"Disconnected from broker (rc={rc})")
 95        self.connected = False
 96
 97    def add_handler(self, topic_pattern: str, handler):
 98        """Add custom message handler for topic pattern"""
 99        self.message_handlers[topic_pattern] = handler
100
101    def connect(self):
102        """Connect to broker"""
103        print(f"Connecting to {self.broker}:{self.port}...")
104        self.client.connect(self.broker, self.port, keepalive=60)
105
106    def run(self):
107        """Run subscriber loop"""
108        self.connect()
109        print("\nWaiting for messages... (Ctrl+C to stop)")
110        print("=" * 50)
111
112        try:
113            self.client.loop_forever()
114        except KeyboardInterrupt:
115            print("\nStopped by user")
116        finally:
117            self.client.disconnect()
118            print(f"Total messages received: {self.message_count}")
119
120# Example custom handlers
121def temperature_alert_handler(topic: str, payload: dict):
122    """Alert when temperature exceeds threshold"""
123    if isinstance(payload, dict):
124        temp = payload.get("temperature")
125        if temp and temp > 30:
126            print(f"  [ALERT] High temperature: {temp}°C")
127
128def motion_handler(topic: str, payload: dict):
129    """Handle motion detection events"""
130    if isinstance(payload, dict):
131        if payload.get("motion_detected"):
132            print(f"  [MOTION] Motion detected at {topic}")
133
134def main():
135    """Main function"""
136    parser = argparse.ArgumentParser(description="MQTT Subscriber")
137    parser.add_argument("--broker", default=DEFAULT_BROKER, help="MQTT broker address")
138    parser.add_argument("--port", type=int, default=DEFAULT_PORT, help="MQTT broker port")
139    parser.add_argument("--topic", nargs="+", default=DEFAULT_TOPICS, help="Topics to subscribe")
140    args = parser.parse_args()
141
142    print("=== MQTT Subscriber ===")
143    print(f"Broker: {args.broker}:{args.port}")
144    print(f"Topics: {', '.join(args.topic)}")
145    print()
146
147    subscriber = MQTTSubscriber(args.broker, args.port, args.topic)
148
149    # Add custom handlers
150    subscriber.add_handler("sensor/+/temperature", temperature_alert_handler)
151    subscriber.add_handler("sensor/+/motion", motion_handler)
152
153    subscriber.run()
154
155if __name__ == "__main__":
156    main()