-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
131 lines (106 loc) · 3.81 KB
/
main.py
File metadata and controls
131 lines (106 loc) · 3.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
#
# SPDX-License-Identifier: MPL-2.0
import contextlib
import logging
from typing import override
import asqlite
import asyncio
import bot
import discord
import os
import re
from bot import bot_owner_id, client, tree, database_required
from src import publisher
from logging.handlers import RotatingFileHandler
@tree.command()
@bot.is_me()
async def sync(interaction: discord.Interaction):
"""Sync the commands
"""
synced = await tree.sync()
await interaction.response.send_message(f"{len(synced)} command(s) has been sync")
@client.event
async def on_message(message: discord.Message) -> None:
if (message.author.id != bot_owner_id):
return
bot_user = client.user
if not bot_user:
return
pattern = f"<@(!?){bot_user.id}> sync"
if re.fullmatch(pattern, message.content):
synced = await tree.sync()
await message.reply(f"{len(synced)} command(s) has been sync")
@client.event
async def on_ready() -> None:
log = logging.getLogger()
log.info(f"Logged in as {client.user}")
class RemoveNoise(logging.Filter):
def __init__(self):
super().__init__(name='discord.state')
@override
def filter(self, record: logging.LogRecord) -> bool:
if record.levelname == 'WARNING' and 'referencing an unknown' in record.msg:
return False
return True
@contextlib.contextmanager
def setup_logging():
log = logging.getLogger()
try:
discord.utils.setup_logging()
# __enter__
max_bytes = 32 * 1024 * 1024 # 32 MiB
logging.getLogger('discord').setLevel(logging.INFO)
logging.getLogger('discord.http').setLevel(logging.WARNING)
logging.getLogger('discord.state').addFilter(RemoveNoise())
log.setLevel(logging.INFO)
handler = RotatingFileHandler(filename='data/onion.log', encoding='utf-8', mode='w', maxBytes=max_bytes, backupCount=5)
dt_fmt = '%Y-%m-%d %H:%M:%S'
fmt = logging.Formatter('[{asctime}] [{levelname:<7}] {name}: {message}', dt_fmt, style='{')
handler.setFormatter(fmt)
log.addHandler(handler)
yield
finally:
# __exit__
handlers = log.handlers[:]
for hdlr in handlers:
hdlr.close()
log.removeHandler(hdlr)
@database_required
async def migrate_db_if_needed() -> None:
log = logging.getLogger()
current_version = (await bot.database.fetchone("SELECT user_version FROM pragma_user_version"))["user_version"]
migration_version = 0
to_execute: list[str] = []
for f in os.listdir("migrations"):
if not f.lower().endswith(".sql"):
continue
try:
migration_version = int(f.rstrip(".sql"))
except:
log.exception(f"Migration failed, unable to get version from {f}")
return
if migration_version <= current_version:
continue
to_execute.append(f)
if migration_version > current_version:
log.info(f"Migrating database from {current_version} to {migration_version}")
for f in to_execute:
await bot.database.executescript(open(f"migrations/{f}").read())
await bot.database.execute(f"PRAGMA user_version = {migration_version}")
async def start_bot() -> None:
bot.check_config()
await publisher.register_commands()
bot.database = await asqlite.connect("data/data.db")
await migrate_db_if_needed()
await client.login(bot.token)
await publisher.restart()
publisher.get_latest_schedule.start()
await client.connect()
def main() -> None:
with setup_logging():
asyncio.run(start_bot())
if __name__ == "__main__":
main()