mqtt_publisher.py

Download
python 145 lines 4.5 KB
  1#!/usr/bin/env python3
  2"""
  3MQTT Publisher Example
  4MQTT 발행자 예제
  5
  6Publishes simulated sensor data to an MQTT broker.
  7
  8Install:
  9    pip install paho-mqtt
 10
 11Usage:
 12    python3 mqtt_publisher.py
 13    python3 mqtt_publisher.py --broker 192.168.1.100 --topic sensor/temp
 14"""
 15
 16import paho.mqtt.client as mqtt
 17import json
 18import time
 19import random
 20import argparse
 21from datetime import datetime
 22
 23# Default configuration
 24DEFAULT_BROKER = "localhost"
 25DEFAULT_PORT = 1883
 26DEFAULT_TOPIC = "sensor/temperature"
 27DEFAULT_INTERVAL = 5
 28DEFAULT_SENSOR_ID = "temp_sensor_01"
 29
 30class MQTTPublisher:
 31    """MQTT Sensor Data Publisher"""
 32
 33    def __init__(self, broker: str, port: int, sensor_id: str):
 34        self.broker = broker
 35        self.port = port
 36        self.sensor_id = sensor_id
 37
 38        # Create MQTT client
 39        self.client = mqtt.Client(client_id=f"publisher_{sensor_id}")
 40        self.client.on_connect = self._on_connect
 41        self.client.on_publish = self._on_publish
 42        self.client.on_disconnect = self._on_disconnect
 43
 44        self.connected = False
 45        self.message_count = 0
 46
 47    def _on_connect(self, client, userdata, flags, rc):
 48        """Connection callback"""
 49        if rc == 0:
 50            print(f"Connected to MQTT broker: {self.broker}:{self.port}")
 51            self.connected = True
 52        else:
 53            print(f"Connection failed with code: {rc}")
 54
 55    def _on_publish(self, client, userdata, mid):
 56        """Publish callback"""
 57        self.message_count += 1
 58        print(f"  [Published] Message ID: {mid}, Total: {self.message_count}")
 59
 60    def _on_disconnect(self, client, userdata, rc):
 61        """Disconnection callback"""
 62        print(f"Disconnected from broker (rc={rc})")
 63        self.connected = False
 64
 65    def connect(self):
 66        """Connect to broker"""
 67        print(f"Connecting to {self.broker}:{self.port}...")
 68        self.client.connect(self.broker, self.port, keepalive=60)
 69        self.client.loop_start()
 70
 71        # Wait for connection
 72        timeout = 5
 73        while not self.connected and timeout > 0:
 74            time.sleep(1)
 75            timeout -= 1
 76
 77        if not self.connected:
 78            raise ConnectionError("Failed to connect to MQTT broker")
 79
 80    def disconnect(self):
 81        """Disconnect from broker"""
 82        self.client.loop_stop()
 83        self.client.disconnect()
 84        print("Disconnected from broker")
 85
 86    def publish(self, topic: str, data: dict, qos: int = 1, retain: bool = False):
 87        """Publish message to topic"""
 88        payload = json.dumps(data)
 89        result = self.client.publish(topic, payload, qos=qos, retain=retain)
 90
 91        if result.rc == mqtt.MQTT_ERR_SUCCESS:
 92            print(f"Published to {topic}: {payload}")
 93            return True
 94        else:
 95            print(f"Publish failed with code: {result.rc}")
 96            return False
 97
 98    def generate_sensor_data(self) -> dict:
 99        """Generate simulated sensor data"""
100        return {
101            "sensor_id": self.sensor_id,
102            "temperature": round(20 + random.uniform(-5, 10), 1),
103            "humidity": round(50 + random.uniform(-20, 20), 1),
104            "timestamp": datetime.now().isoformat()
105        }
106
107def main():
108    """Main function"""
109    parser = argparse.ArgumentParser(description="MQTT Publisher")
110    parser.add_argument("--broker", default=DEFAULT_BROKER, help="MQTT broker address")
111    parser.add_argument("--port", type=int, default=DEFAULT_PORT, help="MQTT broker port")
112    parser.add_argument("--topic", default=DEFAULT_TOPIC, help="Topic to publish to")
113    parser.add_argument("--interval", type=int, default=DEFAULT_INTERVAL, help="Publish interval (seconds)")
114    parser.add_argument("--sensor-id", default=DEFAULT_SENSOR_ID, help="Sensor ID")
115    args = parser.parse_args()
116
117    print("=== MQTT Publisher ===")
118    print(f"Broker:   {args.broker}:{args.port}")
119    print(f"Topic:    {args.topic}")
120    print(f"Interval: {args.interval} seconds")
121    print(f"Sensor:   {args.sensor_id}")
122    print()
123
124    publisher = MQTTPublisher(args.broker, args.port, args.sensor_id)
125
126    try:
127        publisher.connect()
128
129        print("Publishing sensor data... (Ctrl+C to stop)")
130        while True:
131            data = publisher.generate_sensor_data()
132            publisher.publish(args.topic, data, qos=1, retain=True)
133            time.sleep(args.interval)
134
135    except ConnectionError as e:
136        print(f"Connection error: {e}")
137    except KeyboardInterrupt:
138        print("\nStopped by user")
139    finally:
140        publisher.disconnect()
141        print(f"Total messages published: {publisher.message_count}")
142
143if __name__ == "__main__":
144    main()