Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docu/docs/plugin/telegram.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ Die abarbeitung der Alarmierungen erfolgt per Queue nach den Limits der Telegram
|----|------------|-------|
|botToken|Der Api-Key des Telegram-Bots||
|chatIds|Liste mit Chat-Ids der Empfängers / der Emfänger-Gruppen||
|startup_message|Nachricht, dass das Telegram-Plugin erfolgreich geladen wurde|leer|
|message_fms|Format der Nachricht für FMS|`{FMS}`|
|message_pocsag|Format der Nachricht für Pocsag|`{RIC}({SRIC})\n{MSG}`|
|message_zvei|Format der Nachricht für ZVEI|`{TONE}`|
|message_msg|Format der Nachricht für MSG||
|queue|Aktivieren/Deaktivieren der MessageQueue|true|
|max_retries|Anzahl der Versuche, bis das Senden abgebrochen wird|5|
|initial_delay|Verzögerung des zweiten Sendeversuchs|2 [Sek.]|
|max_delay|Maximale Verzögerung|60 [Sek.]|

**Beispiel:**
```yaml
Expand All @@ -35,6 +38,7 @@ Die abarbeitung der Alarmierungen erfolgt per Queue nach den Limits der Telegram
res: telegram
config:
message_pocsag: "{RIC}({SRIC})\n{MSG}"
startup_message: "Server up and running!"
botToken: "BOT_TOKEN"
chatIds:
- "CHAT_ID"
Expand Down
234 changes: 155 additions & 79 deletions plugin/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,126 +10,202 @@
by Bastian Schroll

@file: telegram.py
@date: 20.02.2020
@author: Jan Speller
@description: Telegram Plugin
@date: 12.07.2025
@author: Claus Schichl nach der Idee von Jan Speller
@description: Telegram-Plugin mit Retry-Logik ohne externe Telegram-Abhängigkeiten
"""

import logging
import time
import threading
import queue
import requests
from plugin.pluginBase import PluginBase

# ###################### #
# Custom plugin includes #
from telegram.error import (TelegramError, Unauthorized, BadRequest, TimedOut, NetworkError)
from telegram.ext import messagequeue as mq
from telegram.utils.request import Request
import telegram.bot
# ###################### #
# Setup Logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)


# ===========================
# TelegramSender-Klasse
# ===========================

class TelegramSender:
def __init__(self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None):
self.bot_token = bot_token
self.chat_ids = chat_ids
self.max_retries = max_retries if max_retries is not None else 5
self.initial_delay = initial_delay if initial_delay is not None else 2
self.max_delay = max_delay if max_delay is not None else 300
self.msg_queue = queue.Queue()
self._worker = threading.Thread(target=self._worker_loop, daemon=True)
self._worker.start()

def send_message(self, text):
for chat_id in self.chat_ids:
self.msg_queue.put(("text", chat_id, text, 0)) # retry_count = 0

def send_location(self, latitude, longitude):
for chat_id in self.chat_ids:
self.msg_queue.put(("location", chat_id, {"latitude": latitude, "longitude": longitude}, 0))

def _worker_loop(self):
delay = self.initial_delay

while True:
try:
msg_type, chat_id, content, retry_count = self.msg_queue.get()

success, permanent_failure, custom_delay = self._send_to_telegram(msg_type, chat_id, content)

if success:
delay = self.initial_delay

logging.debug("- %s loaded", __name__)
elif permanent_failure:
logger.error("Permanenter Fehler – Nachricht wird verworfen.")

elif retry_count >= self.max_retries:
logger.error("Maximale Wiederholungsanzahl erreicht – Nachricht wird verworfen.")

class MQBot(telegram.bot.Bot):
'''A subclass of Bot which delegates send method handling to MQ'''
else:
logger.warning(f"Erneutes Einreihen der Nachricht (Versuch {retry_count + 1}).")
self.msg_queue.put((msg_type, chat_id, content, retry_count + 1))

def __init__(self, *args, is_queued_def=True, mqueue=None, **kwargs):
super(MQBot, self).__init__(*args, **kwargs)
# below 2 attributes should be provided for decorator usage
self._is_messages_queued_default = is_queued_def
self._msg_queue = mqueue or mq.MessageQueue()
# Nutze den von Telegram gelieferten Wert (retry_after), falls vorhanden
wait_time = custom_delay if custom_delay is not None else delay
time.sleep(wait_time)

# Erhöhe Delay für den nächsten Versuch (exponentielles Backoff)
delay = min(delay * 2, self.max_delay)

except Exception as e:
logger.exception(f"Fehler im Telegram-Worker: {e}")
time.sleep(5)

def _send_to_telegram(self, msg_type, chat_id, content):
if msg_type == "text":
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
payload = {
'chat_id': chat_id,
'text': content
}
elif msg_type == "location":
url = f"https://api.telegram.org/bot{self.bot_token}/sendLocation"
payload = {
'chat_id': chat_id,
**content
}
else:
logger.error("Unbekannter Nachrichtentyp.")
return False, True, None # Unbekannter Typ = permanent falsch

def __del__(self):
try:
self._msg_queue.stop()
except:
pass
custom_delay = None # Standardwert für Rückgabe, außer bei 429

@mq.queuedmessage
def send_message(self, *args, **kwargs):
'''Wrapped method would accept new `queued` and `isgroup`
OPTIONAL arguments'''
return super(MQBot, self).send_message(*args, **kwargs)
response = requests.post(url, data=payload, timeout=10)

if response.status_code == 429:
custom_delay = response.json().get("parameters", {}).get("retry_after", 5)
logger.warning(f"Rate Limit erreicht – warte {custom_delay} Sekunden.")
return False, False, custom_delay # Telegram gibt genaue Wartezeit vor

if response.status_code == 400:
logger.error("Ungültige Parameter – Nachricht wird nicht erneut gesendet.")
return False, True, custom_delay # Permanent fehlerhaft

if response.status_code == 401:
logger.critical("Ungültiger Bot-Token – bitte prüfen!")
return False, True, custom_delay # Permanent fehlerhaft

response.raise_for_status()
logger.info(f"Erfolgreich gesendet an Chat-ID {chat_id}")
return True, False, custom_delay

except requests.RequestException as e:
logger.warning(f"Fehler beim Senden an Telegram (Chat-ID {chat_id}): {e}")
return False, False, custom_delay

class BoswatchPlugin(PluginBase):
r"""!Description of the Plugin"""

# ===========================
# BoswatchPlugin-Klasse
# ===========================


class BoswatchPlugin(PluginBase):
def __init__(self, config):
r"""!Do not change anything here!"""
super().__init__(__name__, config) # you can access the config class on 'self.config'

def onLoad(self):
r"""!Called by import of the plugin"""
if self.config.get("queue", default=True):
q = mq.MessageQueue()
request = Request(con_pool_size=8)
self.bot = MQBot(token=self.config.get("botToken", default=""), request=request, mqueue=q)
print('queue')
else:
self.bot = telegram.Bot(token=self.config.get("botToken"))
print('normal')
bot_token = self.config.get("botToken")
chat_ids = self.config.get("chatIds", default=[])

if not bot_token or not chat_ids:
logger.error("botToken oder chatIds fehlen in der Konfiguration!")
return

# Konfigurierbare Parameter mit Fallback-Defaults
max_retries = self.config.get("max_retries")
initial_delay = self.config.get("initial_delay")
max_delay = self.config.get("max_delay")

self.sender = TelegramSender(
bot_token=bot_token,
chat_ids=chat_ids,
max_retries=max_retries,
initial_delay=initial_delay,
max_delay=max_delay
)

startup_message = self.config.get("startup_message")
if startup_message and startup_message.strip():
self.sender.send_message(startup_message)

def setup(self):
r"""!Called before alarm
Remove if not implemented"""
pass

def fms(self, bwPacket):
r"""!Called on FMS alarm

@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_fms", default="{FMS}"))
self._sendMessage(msg)
self.sender.send_message(msg)

def pocsag(self, bwPacket):
r"""!Called on POCSAG alarm

@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_pocsag", default="{RIC}({SRIC})\n{MSG}"))
self._sendMessage(msg)
self.sender.send_message(msg)

if bwPacket.get("lat") is not None and bwPacket.get("lon") is not None:
logging.debug("Found coordinates in packet")
(lat, lon) = (bwPacket.get("lat"), bwPacket.get("lon"))
self._sendLocation(lat, lon)
lat, lon = bwPacket.get("lat"), bwPacket.get("lon")
logger.debug("Koordinaten gefunden – sende Standort.")
self.sender.send_location(lat, lon)

def zvei(self, bwPacket):
r"""!Called on ZVEI alarm

@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_zvei", default="{TONE}"))
self._sendMessage(msg)
self.sender.send_message(msg)

def msg(self, bwPacket):
r"""!Called on MSG packet

@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_msg"))
self._sendMessage(msg)
self.sender.send_message(msg)

def _sendMessage(self, message):
for chatId in self.config.get("chatIds", default=[]):
try:
# Send Message via Telegram
logging.info("Sending message to " + chatId)
self.bot.send_message(chat_id=chatId, text=message)

except Unauthorized:
logging.exception("Error while sending Telegram Message, please Check your api-key")
except (TimedOut, NetworkError):
logging.exception("Error while sending Telegram Message, please Check your connectivity")
except (BadRequest, TelegramError):
logging.exception("Error while sending Telegram Message")
except Exception as e:
logging.exception("Unknown Error while sending Telegram Message: " + str(type(e).__name__) + ": " + str(e))
def teardown(self):
r"""!Called after alarm
Remove if not implemented"""
pass

def _sendLocation(self, lat, lon):
for chatId in self.config.get("chatIds", default=[]):
try:
# Send Location via Telegram
if lat is not None and lon is not None:
logging.info("Sending location to " + chatId)
self.bot.sendLocation(chat_id=chatId, latitude=lat, longitude=lon)

except Unauthorized:
logging.exception("Error while sending Telegram Message, please Check your api-key")
except (TimedOut, NetworkError):
logging.exception("Error while sending Telegram Message, please Check your connectivity")
except (BadRequest, TelegramError):
logging.exception("Error while sending Telegram Message")
except Exception as e:
logging.exception("Unknown Error while sending Telegram Message: " + str(type(e).__name__) + ": " + str(e))
def onUnload(self):
r"""!Called by destruction of the plugin
Remove if not implemented"""
pass