-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbot.py
More file actions
478 lines (427 loc) · 16.9 KB
/
bot.py
File metadata and controls
478 lines (427 loc) · 16.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
"""
Serves as the initial file to launch the bot. Loads all needed extensions and maintains
core functionality.
"""
from __future__ import annotations
import asyncio
import datetime
import logging
import logging.handlers
import re
import subprocess
import traceback
import uuid
from typing import TYPE_CHECKING, Any, Literal
import aiohttp
import discord
from beanie import init_beanie
from discord import app_commands
from discord.ext import commands
from motor.motor_asyncio import AsyncIOMotorClient
from rich.logging import RichHandler
import src.mongo.models
from commandchecks import is_staff_from_ctx
from env import env
from src.discord.globals import (
CHANNEL_BOTSPAM,
CHANNEL_DELETEDM,
CHANNEL_DMLOG,
CHANNEL_EDITEDM,
CHANNEL_RULES,
)
from src.discord.reporter import Reporter
if TYPE_CHECKING:
from src.discord.censor import Censor
from src.discord.logger import Logger
from src.discord.spam import SpamManager
intents = discord.Intents.all()
logger = logging.getLogger(__name__)
BOT_PREFIX = "?" if env.dev_mode else "!"
class PiBotCommandTree(app_commands.CommandTree):
def __init__(self, client: PiBot):
super().__init__(client)
async def on_error(
self,
interaction: discord.Interaction,
error: app_commands.AppCommandError,
) -> None:
# Optional delay for some commands
delay = None
# Handle check failures
if isinstance(error, app_commands.NoPrivateMessage):
message = (
"Sorry, but this command does not work in private message. "
"Please hop on over to the Scioly.org server to use the command!"
)
elif isinstance(error, app_commands.MissingRole | app_commands.MissingAnyRole):
message = "Sorry, you don't have the needed role to run this command."
elif isinstance(error, app_commands.MissingPermissions):
message = (
"Sorry, but you aren't allotted the proper permissions "
"needed by this command."
)
elif isinstance(error, app_commands.BotMissingPermissions):
message = (
"Oh no! I can't run this command right now. Please contact a developer."
)
elif isinstance(error, app_commands.CommandOnCooldown):
next_time = discord.utils.utcnow() + datetime.timedelta(
seconds=error.retry_after,
)
message = (
"Time to _chill out_ - this command is on cooldown! "
f"Please try again **{discord.utils.format_dt(next_time, 'R')}.**"
"\n\n"
"For future reference, this command is currently limited to "
f"being executed **{error.cooldown.rate} times every {error.cooldown.per} seconds**."
)
delay = error.retry_after - 1 if error.retry_after > 1 else 0
# Handle general app command errors
elif isinstance(error, app_commands.CommandLimitReached):
message = (
"Oh no! I've reached my max command limit. Please contact a developer."
)
elif isinstance(error, app_commands.CommandInvokeError):
message = "This command experienced a general error."
# Report error to staff
reporter_cog = self.client.get_cog("Reporter")
assert isinstance(reporter_cog, Reporter)
await reporter_cog.create_command_error_report(
error.original,
interaction.command,
)
elif isinstance(error, app_commands.TransformerError):
message = "This command experienced a transformer error."
elif isinstance(error, app_commands.CommandAlreadyRegistered):
message = "This command was already registered."
elif isinstance(error, app_commands.CommandSignatureMismatch):
message = (
"This command is currently out of sync. Please contact a developer."
)
elif isinstance(error, app_commands.CommandNotFound):
message = "Unfortunately, this command could not be found."
elif isinstance(error, app_commands.MissingApplicationID):
message = "This application needs an application ID."
elif isinstance(error, app_commands.CheckFailure):
message = "You are not allowed to run this command here. Try running it in `#bot-spam` instead."
else:
message = "Ooops, there was a command error."
await interaction.response.defer(ephemeral=True)
msg = await interaction.followup.send(message, ephemeral=True, wait=True)
if delay is not None:
await msg.delete(delay=delay)
class PiBot(commands.Bot):
"""
The bot itself. Controls all functionality needed for core operations.
"""
session: aiohttp.ClientSession | None
mongo_client: AsyncIOMotorClient
settings: src.mongo.models.Settings
def __init__(self):
super().__init__(
command_prefix=BOT_PREFIX,
case_insensitive=True,
intents=intents,
help_command=None,
tree_cls=PiBotCommandTree,
)
self.listeners_: dict[
str,
dict[str, Any],
] = {} # name differentiation between internal _listeners attribute
self.__version__ = self.get_version()
if not self.__version__:
self.__version__ = f"dev-{self.get_commit() or 'unknown'}"
self.session = None
self.mongo_client = AsyncIOMotorClient(
env.mongo_url,
tz_aware=True,
)
def get_version(self) -> str | None:
if env.version or not env.dev_mode:
return env.version
def get_commit(self) -> str | None:
with subprocess.Popen(
["git", "rev-parse", "--short", "HEAD"],
stdout=subprocess.PIPE,
) as proc:
if proc.stdout:
hash = proc.stdout.read()
commit = hash.decode("utf-8")
return commit.strip("\n")
return None
async def setup_hook(self) -> None:
"""
Called when the bot is being setup. Currently sets up a connection to the
database and initializes all extensions.
"""
await init_beanie(
database=self.mongo_client["data"],
document_models=[
src.mongo.models.Cron,
src.mongo.models.Ping,
src.mongo.models.Tag,
src.mongo.models.Invitational,
src.mongo.models.Event,
src.mongo.models.Censor,
src.mongo.models.Settings,
src.mongo.models.UserRoles,
# TODO
],
)
extensions = (
"src.discord.censor",
"src.discord.ping",
"src.discord.staffcommands",
"src.discord.staff.invitationals",
"src.discord.staff.censor",
"src.discord.staff.tags",
"src.discord.staff.events",
"src.discord.staff.usercleanup",
"src.discord.embed",
"src.discord.membercommands",
"src.discord.devtools",
"src.discord.funcommands",
"src.discord.tasks",
"src.discord.spam",
"src.discord.reporter",
"src.discord.logger",
"src.discord.rolerestore",
)
for i, extension in enumerate(extensions):
try:
await self.load_extension(extension)
logger.info(f"Enabled extension: {extension} {i + 1}/{len(extensions)}")
except commands.ExtensionError:
logger.error(f"Failed to load extension {extension}!")
traceback.print_exc()
async def on_ready(self) -> None:
"""
Called when the bot is enabled and ready to be run.
"""
# try:
# await self.tree.sync(guild=discord.Object(749057176756027414))
# except:
# import traceback
# traceback.print_exc()
logger.info(f"{self.user} has connected!")
# Add message to rules channel
server = self.get_guild(env.dev_server_id)
assert isinstance(server, discord.Guild)
rules_channel = discord.utils.get(server.text_channels, name=CHANNEL_RULES)
assert isinstance(rules_channel, discord.TextChannel)
rules_message = [m async for m in rules_channel.history(limit=1)]
if rules_message:
rules_message = rules_message[0]
view = discord.ui.View()
view.add_item(
discord.ui.Button(
url="https://scioly.org/rules",
label="Complete Scioly.org rules",
style=discord.ButtonStyle.link,
),
)
await rules_message.edit(view=view)
async def on_message(self, message: discord.Message) -> None:
# Nothing needs to be done to the bot's own messages
if message.author.bot:
return
# If user is being listened to, return their message
for listener in self.listeners_.items():
if message.author.id == listener[1]["follow_id"]:
listener[1]["message"] = message
# Log incoming direct messages
if isinstance(message.channel, discord.DMChannel) and message.author != bot:
logger_cog: commands.Cog | Logger = self.get_cog("Logger")
await logger_cog.send_to_dm_log(message)
logger.info(
f"Message from {message.author} through DM's: {message.content}",
)
else:
# Print to output
if not (
message.author.bot
and message.channel.name
in [CHANNEL_EDITEDM, CHANNEL_DELETEDM, CHANNEL_DMLOG]
):
# avoid sending logs for messages in log channels
logger.info(
f"Message from {message.author} in #{message.channel}: {message.content}",
)
# Check if the message contains a censored word/emoji
is_private = isinstance(
message.channel,
discord.DMChannel | discord.GroupChannel,
)
if message.content and not is_private:
censor: commands.Cog | Censor = self.get_cog("Censor")
await censor.on_message(message)
# Check to see if the message contains repeated content or has too many caps
spam: commands.Cog | SpamManager = self.get_cog("SpamManager")
await spam.store_and_validate(message)
legacy_command: list[str] = re.findall(
rf"^{re.escape(BOT_PREFIX)}\s*(\w+)",
message.content,
)
if message.content and len(legacy_command):
if legacy_command[0] == sync.name or legacy_command[0].startswith(
f"{sync.name} ",
):
await bot.process_commands(message)
return
botspam_channel = discord.utils.get(
message.guild.channels,
name=CHANNEL_BOTSPAM,
)
reply_message = await message.reply(
f"Hola {message.author.mention}, please use slash commands now! Try typing `/` in {botspam_channel.mention}!",
)
await reply_message.delete(delay=10)
async def start(self, token: str, *, reconnect: bool = True) -> None:
self.session = aiohttp.ClientSession()
await super().start(token=token, reconnect=reconnect)
async def close(self) -> None:
if self.session:
await self.session.close()
await super().close()
async def listen_for_response(
self,
follow_id: int,
timeout: int,
) -> discord.Message | None:
"""
Creates a global listener for a message from a user.
Args:
follow_id: the user ID to create the listener for
timeout: the amount of time to wait before returning None, assuming
the user abandoned the operation
Returns:
the found message or None
"""
my_id = str(uuid.uuid4())
self.listeners_[my_id] = {
"follow_id": follow_id,
"timeout": timeout,
"message": None,
}
count = timeout
while count > 0:
await asyncio.sleep(1)
count -= 1
if self.listeners_[my_id]["message"] is not None:
message = self.listeners_[my_id]["message"]
del self.listeners_[my_id]
return message
return None
async def sync_commands(
self,
only_guild_commands: bool,
) -> tuple[list[app_commands.AppCommand], dict[int, list[app_commands.AppCommand]]]:
logger.info(
f"Beginning to sync {'guild-only' if only_guild_commands else 'all'} commands ...",
)
global_cmds_synced = []
if not only_guild_commands:
global_cmds_synced = await self.tree.sync()
logger.info(f"{len(global_cmds_synced)} global commands were synced")
guild_commands = {}
for command_guild in env.slash_command_guilds:
guild_cmds_synced = await self.tree.sync(
guild=discord.Object(id=command_guild),
)
guild_commands[command_guild] = guild_cmds_synced
logger.info(
f"{len(guild_cmds_synced)} guild commands were synced for guild with id {command_guild}",
)
return (global_cmds_synced, guild_commands)
bot = PiBot()
KB = 1024
MB = 1024 * KB
handler = logging.handlers.RotatingFileHandler(
filename="pibot.log",
encoding="utf-8",
maxBytes=32 * MB,
backupCount=5,
)
discord.utils.setup_logging(handler=handler)
@bot.command(
description="Syncs command list. Any new commands will be available for use.",
help="The command has an optional second argument to state whether to sync all or only guild commands. Please specify it by either mentioning {}.",
)
@commands.cooldown(1, 60, commands.BucketType.guild)
@commands.check(is_staff_from_ctx)
async def sync(ctx: commands.Context, sync_type: Literal["all", "guild"] = "all"):
"""
Syncs and registers and new commands with Discord. This command is a
top-level command to prevent disabled cogs from disabling sync
functionality.
"""
match sync_type:
case "all":
only_guild_commands = False
case "guild":
only_guild_commands = True
async with ctx.typing(ephemeral=True):
global_cmds_synced, guild_cmds_synced = await bot.sync_commands(
only_guild_commands,
)
res_msg = (
f"{len(global_cmds_synced)} global commands were synced."
if not only_guild_commands
else ""
)
for guild_id, cmds in guild_cmds_synced.items():
guild_info = bot.get_guild(guild_id)
guild_name_id = f"guild with id {guild_id}"
if guild_info:
guild_name_id = f'"{guild_info.name}" (id: {guild_id})'
res_msg += f"\n{len(cmds)} guild commands were synced in {guild_name_id}."
res_msg = res_msg.strip("\n")
await ctx.send(res_msg)
@sync.error
async def sync_error(ctx: commands.Context, err: commands.CommandError):
logging.error(err)
if isinstance(err, commands.CommandOnCooldown):
return await ctx.send(
"`{}{}` is on cooldown since the API endpoint for syncing commands to Discord is "
"ratelimited. Please try again in {:.2f} seconds.".format(
BOT_PREFIX,
sync.name,
err.retry_after,
),
)
if isinstance(err, commands.MissingAnyRole):
return await ctx.send(str(err))
if isinstance(err, commands.BadLiteralArgument):
literals = list(err.literals)
for i, literal in enumerate(literals):
literals[i] = f"`{literal}`"
literals[-1] = f"or {literals[-1]}"
assert sync.help is not None
msg = sync.help.format((", " if len(literals) > 2 else " ").join(literals))
return await ctx.send(msg)
await ctx.send(f"An unknown error occurred when syncing: {err}")
async def main(token: str):
"""
Main event loop for the bot.
Args:
token (str): The bot token.
"""
async with bot:
await bot.start(token=token)
if __name__ == "__main__":
logger = logging.getLogger()
if env.dev_mode:
logger.level = logging.DEBUG
logger.addHandler(RichHandler(level=logging.DEBUG, rich_tracebacks=True))
else:
logger.level = logging.INFO
streamHandler = logging.StreamHandler()
streamHandler.setFormatter(
logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s",
"%Y-%m-%d %H:%M:%S",
),
)
logger.addHandler(streamHandler)
asyncio.run(main(env.discord_token))