sudo apt-get update
sudo apt-get install python3-requests mosquitto python3-paho-mqtt mosquitto-clients
#!/usr/bin/env python3
import paho.mqtt.client as mqtt
import requests
import json
import sys
from typing import Optional
class AlarmMonitor:
def __init__(self):
# Telegram settings
self.token = "YOUR_BOT_TOKEN" # Replace with your bot token
self.chat_id = "YOUR_CHAT_ID" # Replace with your chat ID
# MQTT settings
self.mqtt_host = "localhost"
self.mqtt_port = 1883
self.mqtt_topic = "alarms/#" # Subscribe to all alarm topics
# Initialize MQTT client
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
print("Alarm Monitor starting...")
def on_connect(self, client, userdata, flags, rc):
"""Callback when connected to MQTT broker"""
if rc == 0:
print("Connected to MQTT broker")
# Subscribe to alarm topics
self.client.subscribe(self.mqtt_topic)
print(f"Subscribed to topic: {self.mqtt_topic}")
else:
print(f"Failed to connect to MQTT broker with code: {rc}")
def on_message(self, client, userdata, msg):
"""Callback when message is received"""
try:
# Try to parse message as JSON
payload = msg.payload.decode()
try:
alarm_data = json.loads(payload)
message = self.format_alarm_message(msg.topic, alarm_data)
except json.JSONDecodeError:
# If not JSON, treat as plain text
message = self.format_alarm_message(msg.topic, payload)
# Send to Telegram
self.send_telegram_message(message)
except Exception as e:
print(f"Error processing message: {str(e)}", file=sys.stderr)
sys.stderr.flush()
def format_alarm_message(self, topic: str, data: dict) -> str:
"""Format alarm data into readable message"""
message = f"🚨 ALARM NOTIFICATION 🚨\n"
message += f"Topic: {topic}\n"
if isinstance(data, dict):
for key, value in data.items():
message += f"{key}: {value}\n"
else:
message += f"Message: {data}\n"
return message
def send_telegram_message(self, message: str) -> None:
"""Send message to Telegram"""
url = f"https://api.telegram.org/bot{self.token}/sendMessage"
data = {'chat_id': self.chat_id, 'text': message}
try:
response = requests.post(url, data=data)
if not response.ok:
print(f"Failed to send Telegram message: {response.text}")
except Exception as e:
print(f"Error sending to Telegram: {str(e)}")
def run(self):
"""Start the monitor"""
try:
# Connect to MQTT broker
self.client.connect(self.mqtt_host, self.mqtt_port, 60)
# Start the loop
self.client.loop_forever()
except KeyboardInterrupt:
print("\nMonitoring stopped")
self.client.disconnect()
sys.exit(0)
except Exception as e:
print(f"Error running monitor: {str(e)}")
sys.exit(1)
if __name__ == '__main__':
monitor = AlarmMonitor()
monitor.run()
sudo nano /etc/mosquitto/mosquitto.conf
Add this content:
listener 1883
allow_anonymous true
sudo nano /etc/systemd/system/mqtt_broker.service
Add content:
[Unit]
Description=Mosquitto MQTT Broker
After=network.target
[Service]
Type=simple
ExecStart=/usr/sbin/mosquitto -c /etc/mosquitto/mosquitto.conf
Restart=always
[Install]
WantedBy=multi-user.target
sudo nano /etc/systemd/system/mqtt_telegram.service
Add content:
[Unit]
Description=MQTT-Telegram Alarm Integration
After=network.target mosquitto.service
[Service]
ExecStart=/full/path/to/mqtt_telegram.py
Restart=always
User=your_username
[Install]
WantedBy=multi-user.target
sudo systemctl enable mosquitto
sudo systemctl start mosquitto
sudo systemctl enable mqtt_telegram
sudo systemctl start mqtt_telegram
mosquitto_sub -t "alarms/#"
mosquitto_pub -t "alarms/test" -m "{'type': 'test_alarm', 'severity': 'high'}"
sudo systemctl status mosquitto
sudo systemctl status mqtt_telegram
View logs:
journalctl -u mqtt_telegram -f
{
"type": "temperature_alert",
"severity": "high",
"value": "35.5",
"unit": "C"
}