-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsupervised_worker.py
More file actions
66 lines (49 loc) · 1.78 KB
/
supervised_worker.py
File metadata and controls
66 lines (49 loc) · 1.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
"""A tiny daemon you can drive from Telegram.
export TELEGRAM_TOKEN=123:ABC
export TELEGRAM_CHAT_ID=987654321
python examples/supervised_worker.py
Then in your chat: /status /pause /resume /stop /help
The worker also pushes an alert on every 10th item and a throttled heartbeat.
"""
import asyncio
import os
from telegram_ctl import Controller, Notifier
class Worker:
def __init__(self):
self.processed = 0
self.paused = False
self.stop = asyncio.Event()
async def work_loop(w: Worker, notify: Notifier):
while not w.stop.is_set():
if not w.paused:
w.processed += 1
if w.processed % 10 == 0:
notify.send(f"✅ processed {w.processed} items")
notify.throttled("heartbeat", f"💓 alive, processed={w.processed}", every=3600)
await asyncio.sleep(0.5)
async def main():
if not os.getenv("TELEGRAM_TOKEN"):
raise SystemExit("set TELEGRAM_TOKEN and TELEGRAM_CHAT_ID first")
w = Worker()
notify = Notifier()
ctl = Controller()
@ctl.command("status", help="current worker state")
def status(args):
state = "paused" if w.paused else "running"
return f"📊 {state} — processed {w.processed}"
@ctl.command("pause", help="stop taking new work")
def pause(args):
w.paused = True
return "⏸ paused"
@ctl.command("resume", help="resume work")
def resume(args):
w.paused = False
return "▶️ resumed"
@ctl.command("stop", help="graceful shutdown")
def stop(args):
w.stop.set()
return "🛑 stopping"
notify.send("🚀 worker started — /help for commands")
await asyncio.gather(work_loop(w, notify), ctl.run(w.stop))
if __name__ == "__main__":
asyncio.run(main())