-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathbot.py
More file actions
74 lines (63 loc) · 2.45 KB
/
bot.py
File metadata and controls
74 lines (63 loc) · 2.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import asyncio
import datetime
import aioschedule
import pytz
from pyrogram import Client
from pyrogram.errors import FloodWait
from config import (API_HASH, API_ID, BOT_ADMIN_IDS, BOT_LIST,
CHANNEL_OR_GROUP_ID, CHECK_DELAY, MESSAGE_ID,
SESSION_STRING, TIME_ZONE)
app = Client(
'BOT',
API_ID,
API_HASH,
session_string=SESSION_STRING,
in_memory=True
)
async def bot_check(bot_username):
"""Checks if bot is online or not
Args:
bot_username (str): bot username to check
Returns:
str: bot status
"""
try:
checker_status = await app.send_message(bot_username, "/start")
first_message_id = checker_status.id
await asyncio.sleep(5)
async for message in app.get_chat_history(bot_username, limit=1):
second_message_id = message.id
if first_message_id == second_message_id:
status = f"\n\n🤖 **Bot**: @{bot_username}\n🔴 Status: **OFF** ❌"
for bot_admin_id in BOT_ADMIN_IDS:
if bot_admin_id.isnumeric():
bot_admin_id = int(bot_admin_id)
try:
await app.send_message(bot_admin_id, f"🚨 **Notification** 🚨\n\n» @{bot_username} is **DEAD** ❌")
except Exception as e:
print(e)
else:
status = f"\n\n🤖 **Bot**: @{bot_username}\n🟢 Status: **ON** ✅"
await app.read_chat_history(bot_username)
return status
except FloodWait as e:
await asyncio.sleep(e.x)
async def status_checker():
message = f"💡 **Bots Status** 💡\n\n"
for bot in BOT_LIST:
message += await bot_check(bot)
time = datetime.datetime.now(pytz.timezone(f"{TIME_ZONE}"))
last_update = time.strftime("%d %b %Y at %I:%M %p")
message += f"\n\n🛂 Last Check: {last_update} ({TIME_ZONE})\n\n🟡 **It will be updated every {CHECK_DELAY} Seconds ({int(CHECK_DELAY/60)} Minutes)**"
await app.edit_message_text(int(CHANNEL_OR_GROUP_ID), MESSAGE_ID, message)
print(f"Last Check: {last_update}")
async def main():
await app.start()
info = await app.get_me()
print(f'[INFO] @{info.username} STARTED!')
await status_checker()
aioschedule.every(CHECK_DELAY).seconds.do(status_checker)
while True:
await aioschedule.run_pending()
await asyncio.sleep(10)
asyncio.get_event_loop().run_until_complete(main())