-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreplicator.py
More file actions
199 lines (162 loc) · 7.47 KB
/
replicator.py
File metadata and controls
199 lines (162 loc) · 7.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import discord
import aiohttp
import json
import re
from collections import deque
from itertools import cycle
# Configuration
TOKEN = 'TOKEN_GOES_HERE'
# ---------------- ROUTING CONFIGURATION ----------------
# Map Source Channel IDs to a "Route Tag"
SOURCE_CHANNELS = {
1234567890123456789: 'RouteTag1',
}
# Map "Route Tags" to lists of Webhooks
WEBHOOK_ROUTES = {
'RouteTag1': [
'https://discord.com/api/webhooks/...',
'https://discord.com/api/webhooks/...'
]
}
# -------------------------------------------------------
class SelfBot(discord.Client):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.session = None
self.webhook_iterators = {}
self.route_states = {}
self.forwarded_messages = deque(maxlen=10000)
for tag, urls in WEBHOOK_ROUTES.items():
if urls:
self.webhook_iterators[tag] = cycle(urls)
self.route_states[tag] = {'last_user_id': None, 'current_url': None}
else:
print(f"Warning: Route '{tag}' has no webhooks configured.")
async def on_ready(self):
self.session = aiohttp.ClientSession()
print(f'Logged in as {self.user} (ID: {self.user.id})')
print(f'Monitoring {len(SOURCE_CHANNELS)} channels.')
print(f'Active Routes: {list(self.webhook_iterators.keys())}')
def _get_route_info(self, message):
"""Determine routing and thread context for a message.
Returns (route_tag, thread_name, is_thread_starter) or None if message should be ignored.
"""
channel = message.channel
if isinstance(channel, discord.Thread):
parent_id = channel.parent_id
if parent_id not in SOURCE_CHANNELS:
return None
# Skip empty thread system messages
if not message.content and not message.attachments and not message.embeds and not message.stickers:
return None
route_tag = SOURCE_CHANNELS[parent_id]
is_thread_starter = message.id in self.forwarded_messages
if is_thread_starter:
self.forwarded_messages.remove(message.id)
return (route_tag, channel.name, is_thread_starter)
if channel.id in SOURCE_CHANNELS:
self.forwarded_messages.append(message.id)
return (SOURCE_CHANNELS[channel.id], None, False)
return None
def _sanitize_mentions(self, message):
"""Replace mention IDs with readable names."""
content = message.content
if not message.mentions:
return content
mention_map = {
user.id: getattr(user, 'nick', None) or user.name
for user in message.mentions
}
def replace_mention(match):
user_id = int(match.group(1))
if user_id in mention_map:
return f"{match.group(0)} (@{mention_map[user_id]})"
return match.group(0)
return re.sub(r'<@!?(\d+)>', replace_mention, content)
def _get_display_name(self, author):
"""Get the display name for a message author."""
real_name = author.name
nick = getattr(author, 'nick', None)
if nick:
return f"{nick} (@{real_name})"
return real_name
def _build_payload(self, message, thread_name, is_thread_starter):
"""Build the webhook payload from a message."""
content = self._sanitize_mentions(message)
# Add thread context
if is_thread_starter and thread_name:
content = f"> [Thread Started: {thread_name}]\n{content}"
elif thread_name:
content = f"> [Thread: {thread_name}]\n{content}"
# Add sticker info
if message.stickers:
sticker_text = '\n'.join(f'Sticker: {s.name}' for s in message.stickers)
content = f"{content}\n{sticker_text}" if content else sticker_text
return {
'content': content,
'username': self._get_display_name(message.author),
'avatar_url': str(message.author.avatar.url) if message.author.avatar else None,
'embeds': [embed.to_dict() for embed in message.embeds]
}
async def _add_reply_context(self, message, payload):
"""Add reply context to payload if message is a reply."""
if not message.reference:
return
try:
ref_msg = await message.channel.fetch_message(message.reference.message_id)
clean_content = ref_msg.content.replace('\n', ' ')
preview = clean_content[:100] + "..." if len(clean_content) > 100 else clean_content
payload['content'] = f'> Replying to {ref_msg.author.name}: {preview}\n{payload["content"]}'
except Exception as e:
print(f"Failed to fetch referenced message: {e}")
def _get_webhook_url(self, route_tag):
"""Get the next webhook URL for a route, cycling based on user."""
return self.route_states[route_tag]['current_url']
def _update_webhook_for_user(self, route_tag, user_id):
"""Update webhook selection if user changed."""
state = self.route_states[route_tag]
if state['current_url'] is None or state['last_user_id'] != user_id:
state['current_url'] = next(self.webhook_iterators[route_tag])
state['last_user_id'] = user_id
async def on_message(self, message):
route_info = self._get_route_info(message)
if not route_info:
return
route_tag, thread_name, is_thread_starter = route_info
if route_tag not in self.webhook_iterators:
print(f"Error: No configured webhooks for route '{route_tag}'")
return
payload = self._build_payload(message, thread_name, is_thread_starter)
# Add reply context for non-thread messages
if not thread_name:
await self._add_reply_context(message, payload)
self._update_webhook_for_user(route_tag, message.author.id)
try:
await self._send_webhook(self._get_webhook_url(route_tag), payload, message.attachments, route_tag)
except Exception as e:
print(f"Error on route {route_tag}: {e}")
async def _send_webhook(self, webhook_url, payload, attachments, route_tag):
"""Send a message to a webhook, handling attachments if present."""
if attachments:
form = aiohttp.FormData()
form.add_field('payload_json', json.dumps(payload))
for attachment in attachments:
file_data = await attachment.read()
form.add_field('file', file_data, filename=attachment.filename, content_type=attachment.content_type)
async with self.session.post(webhook_url, data=form) as resp:
await self._handle_webhook_response(resp, route_tag)
else:
async with self.session.post(webhook_url, json=payload) as resp:
await self._handle_webhook_response(resp, route_tag)
async def _handle_webhook_response(self, response, route_tag):
"""Handle the webhook response and log any errors."""
if response.status == 429:
print(f"Rate limited on route {route_tag}. Skipping to next hook.")
elif response.status not in (200, 204):
print(f'Webhook failed on route {route_tag}: {response.status}')
async def close(self):
if self.session:
await self.session.close()
await super().close()
client = SelfBot(self_bot=True, intents=False)
client.run(TOKEN)