forked from BennoCrafter/PollMatrixBot
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
executable file
·143 lines (110 loc) · 4.26 KB
/
main.py
File metadata and controls
executable file
·143 lines (110 loc) · 4.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from typing import Optional
from src.bot_instance import get_bot, initialize_bot
from pathlib import Path
import os
from dotenv import load_dotenv
from nio.events.room_events import ReactionEvent, RoomMessageText
from nio.responses import DirectRoomsErrorResponse, RoomCreateError
from nio.rooms import MatrixRoom
import simplematrixbotlib as botlib
import datetime
from src.command_system import register_command_from_path
from src.commands.command import Command
from src.poll import Poll
from src.utils.load_config import load_config
from src.utils.get_quantity_number import get_quantity_number
from src.utils.load_file import load_file
from src.utils.logging_config import setup_logger
from src.utils.once_decorator import once
from src.poll_manager import PollManager
# Setup logger
logger = setup_logger(__name__)
# Load environment variables
load_dotenv()
config = load_config("assets/config.yaml")
session_file_path = Path(config["session_file"])
# Remove session.txt file if required by configuration
if session_file_path.exists() and config.get("delete_session_file_on_start"):
session_file_path.unlink()
PREFIX = config["prefix"]
# init bot instance
initialize_bot()
bot = get_bot()
async def handle_message(match: botlib.MessageMatch, config):
"""Process incoming messages and execute the corresponding command."""
to_exec_command: Command | None = None
for command in commands:
if command.is_valid(match):
to_exec_command = command
break
if to_exec_command is None:
if not config["use_add_command"]:
await poll_manager.add_item(match)
return
return
await to_exec_command.execute(message=match)
@bot.listener.on_message_event # type: ignore
async def on_message(room: MatrixRoom, message: RoomMessageText) -> None:
match = botlib.MessageMatch(room, message, bot, PREFIX)
if match.is_not_from_this_bot():
await handle_message(match, config)
@bot.listener.on_reaction_event # type: ignore
async def on_reaction(room: MatrixRoom, reaction: ReactionEvent, k: str) -> None:
"""Handle reactions; currently no operations defined."""
pass
@bot.listener.on_startup # type: ignore
@once
async def on_startup(w) -> None:
logger.info("Bot started successfully.")
# await send_private_dm("@bennowo:matrix.org", "Hello, I'm a bot. I can do some stuff.")
def get_all_extensions(for_path: Path) -> list[Path]:
"""Get all extensions for a folder path"""
paths: list[Path] = []
for path in for_path.iterdir():
if path.is_dir() and path.stem not in ["__pycache__"]:
paths += get_all_extensions(path)
continue
if path.suffix != ".py" or path.name.startswith("_") or path.stem in ["command"]:
continue
paths.append(path)
return paths
def load_commands(for_path: Path) -> list[Command]:
"""Load all commands from a folder path"""
commands: list[Command] = []
for path in get_all_extensions(for_path):
tn: list[str] = config["commands"].get(path.stem, [])
c = register_command_from_path(path, tn)
if c is not None:
commands.append(c)
for c in commands:
c.load()
return commands
async def create_direct_room(user_id: str) -> Optional[str]:
resp = await bot.async_client.room_create(
invite=[user_id],
is_direct=True,
)
if isinstance(resp, RoomCreateError):
logger.error(f"Could not create DM with {user_id}: {resp.message}")
return
return resp.room_id
async def send_private_dm(user_id: str, message: str) -> bool:
resp = await bot.async_client.list_direct_rooms()
if isinstance(resp, DirectRoomsErrorResponse):
logger.error(f"Could not list direct rooms: {resp.message}")
return False
if user_id not in resp.rooms:
room_id = await create_direct_room(user_id)
if not room_id:
return False
else:
room_id = resp.rooms[user_id][0]
await bot.api.async_client.room_leave(room_id)
logger.info(resp.rooms)
await bot.api.send_text_message(room_id, message)
return True
if __name__ == "__main__":
poll_manager = PollManager()
commands: list[Command] = load_commands(Path("src/commands"))
logger.info("Starting bot...")
bot.run()