-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbot.py
326 lines (277 loc) · 13.5 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
from __future__ import annotations
import asyncio
import io
import sys
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from importlib.metadata import version
from os import getenv
from os.path import split
from typing import Optional
from urllib.parse import urlparse
import aiohttp
import nextcord
import sqlobject as sql
from nextcord.ext import commands
from fred import config
from fred.cogs import crashes, mediaonly, webhooklistener, welcome, levelling
from fred.fred_commands import Commands, FredHelpEmbed
from fred.libraries import createembed, common, owo
class Bot(commands.Bot):
async def isAlive(self: Bot):
try:
self.logger.info("Healthcheck: Attempting DB fetch")
_ = config.Misc.get(1)
self.logger.info("Healthcheck: Creating user fetch")
coro = self.fetch_user(227473074616795137)
self.logger.info("Healthcheck: Executing user fetch")
await asyncio.wait_for(coro, timeout=5)
except Exception as e:
self.logger.error(f"Healthcheck failed: {e}")
return False
return True
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.isReady = False
self.logger = common.new_logger(self.__class__.__name__)
self.version = version("fred")
self.logger.info(f"Starting Fred v{self.version}")
self.setup_DB()
self.command_prefix = config.Misc.fetch("prefix")
self.setup_cogs()
FredHelpEmbed.setup()
self.owo = False
self.web_session: aiohttp.ClientSession = ...
self.loop = asyncio.get_event_loop()
self.executor = ThreadPoolExecutor()
self.error_channel = int(chan) if (chan := config.Misc.fetch("error_channel")) else 748229790825185311
async def start(self, *args, **kwargs):
async with aiohttp.ClientSession() as session:
self.web_session = session
return await super().start(*args, **kwargs)
@staticmethod
def is_running():
return config.Misc.fetch("is_running")
async def on_ready(self):
await self.change_presence(activity=nextcord.Game(f"v{self.version}"))
self.isReady = True
self.logger.info(f"We have logged in as {self.user} with prefix {self.command_prefix}")
async def on_reaction_add(self, reaction: nextcord.Reaction, user: nextcord.User) -> None:
if not user.bot and reaction.message.author.bot and reaction.emoji == "❌":
self.logger.info(f"Removing my own message because {user.display_name} reacted with ❌.")
await reaction.message.delete()
def setup_DB(self):
self.logger.info("Connecting to the database")
user = getenv("FRED_SQL_USER")
password = getenv("FRED_SQL_PASSWORD")
host = getenv("FRED_SQL_HOST")
port = getenv("FRED_SQL_PORT")
dbname = getenv("FRED_SQL_DB")
uri = f"postgresql://{user}:{password}@{host}:{port}/{dbname}"
attempt = 1
while attempt < 6:
try:
connection = sql.connectionForURI(uri)
sql.sqlhub.processConnection = connection
config.migrate()
self.logger.debug("Applied migration.")
break
except sql.dberrors.OperationalError:
self.logger.error(f"Could not connect to the DB on attempt {attempt}")
attempt += 1
time.sleep(5)
else: # this happens if the loop is not broken by a successful connection
raise ConnectionError("Could not connect to the DB")
self.logger.info(f"Connected to the DB. Took {attempt} tries.")
def setup_cogs(self):
self.logger.info("Setting up cogs")
self.add_cog(Commands(self))
self.add_cog(webhooklistener.Githook(self))
self.add_cog(mediaonly.MediaOnly(self))
self.add_cog(crashes.Crashes(self))
self.add_cog(welcome.Welcome(self))
self.add_cog(levelling.Levelling(self))
self.logger.info("Successfully set up cogs")
@property
def MediaOnly(self) -> mediaonly.MediaOnly:
return self.get_cog("MediaOnly") # noqa
@property
def Crashes(self) -> crashes.Crashes:
return self.get_cog("Crashes") # noqa
@property
def Welcome(self) -> welcome.Welcome:
return self.get_cog("Welcome") # noqa
async def on_error(self, event_method: str, *args, **kwargs):
exc_type, value, tb = sys.exc_info()
if event_method == "on_message":
channel: nextcord.abc.GuildChannel | nextcord.DMChannel = args[0].channel
if isinstance(channel, nextcord.DMChannel):
if channel.recipient is not None:
channel_str = f"{channel.recipient}'s DMs"
else:
channel_str = "a DM"
else:
channel_str = f"{channel.guild.name}: `#{channel.name}` ({channel.mention})"
else:
channel_str = ""
fred_str = f"Fred v{self.version}"
error_meta = f"{exc_type.__name__} exception handled in `{event_method}` in {channel_str}"
full_error = f"\n{value}\n\n{''.join(traceback.format_tb(tb))}"
self.logger.error(f"{fred_str}\n{error_meta}\n{full_error}")
# error_embed = nextcord.Embed(colour=nextcord.Colour.red(), title=error_meta, description=full_error)
# error_embed.set_author(name=fred_str)
await self.get_channel(self.error_channel).send(f"**{fred_str}**\n{error_meta}\n```py\n{full_error}```")
async def githook_send(self, data: dict):
self.logger.info("Handling GitHub payload", extra={"data": data})
embed: nextcord.Embed | None = await createembed.github_embed(data)
if embed is None:
self.logger.info("Non-supported Payload received")
else:
self.logger.info("GitHub payload was supported, sending an embed")
channel = self.get_channel(config.Misc.fetch("githook_channel"))
await channel.send(content=None, embed=embed)
async def send_DM(
self,
user: nextcord.User | nextcord.Member,
content: str = None,
embed: nextcord.Embed = None,
user_meta: config.Users = None,
in_dm: bool = False,
**kwargs,
) -> bool:
if self.owo:
if content is not None:
content = owo.owoize(content)
if embed is not None:
embed.title = owo.owoize(embed.title)
embed.description = owo.owoize(embed.description)
# don't do the fields because those are most often literal command names, like in help
self.logger.info("Sending a DM", extra=common.user_info(user))
if not user_meta:
user_meta = config.Users.create_if_missing(user)
if not user_meta.accepts_dms and not in_dm:
self.logger.info("The user refuses to have DMs sent to them")
return False
try:
if not user.dm_channel:
await user.create_dm()
if not embed:
embed = createembed.DM(content)
content = None
await user.dm_channel.send(content=content, embed=embed, **kwargs)
return True
except Exception: # noqa
self.logger.error(f"DMs: Failed to DM, reason: \n{traceback.format_exc()}")
return False
async def checked_DM(self, user: nextcord.User, **kwargs) -> bool:
user_meta = config.Users.create_if_missing(user)
try:
return await self.send_DM(user, user_meta=user_meta, **kwargs)
except (nextcord.HTTPException, nextcord.Forbidden):
# user has blocked bot or does not take mutual-server DMs
return False
async def reply_to_msg(
self,
message: nextcord.Message,
content: Optional[str] = None,
propagate_reply: bool = True,
**kwargs,
) -> nextcord.Message:
self.logger.info("Replying to a message", extra=common.message_info(message))
# use this line if you're trying to debug discord throwing code 400s
# self.logger.debug(jsonpickle.dumps(dict(content=content, **kwargs), indent=2))
pingee = message.author
if propagate_reply and message.reference is not None:
reference = message.reference
if (referenced_message := message.reference.cached_message) is not None:
pingee = referenced_message.author
if referenced_message.author == self.user:
reference = message
else:
reference = message
if self.owo and content is not None:
content = owo.owoize(content)
if isinstance(reference, nextcord.MessageReference):
reference.fail_if_not_exists = False
try:
return await message.channel.send(content, reference=reference, **kwargs)
except (nextcord.HTTPException, nextcord.Forbidden):
if content and pingee.mention not in content:
content += f"\n-# {pingee.mention} ↩️"
return await message.channel.send(content, **kwargs)
async def reply_question(
self, message: nextcord.Message, question: Optional[str] = None, **kwargs
) -> tuple[str, Optional[nextcord.Attachment]]:
await self.reply_to_msg(message, question, **kwargs)
def check(message2: nextcord.Message):
nonlocal message
return message2.author == message.author
try:
response: nextcord.Message = await self.wait_for("message", timeout=120.0, check=check)
except asyncio.TimeoutError:
await self.reply_to_msg(message, "Timed out and aborted after 120 seconds.")
raise asyncio.TimeoutError
return response.content, response.attachments[0] if response.attachments else None
async def reply_yes_or_no(self, message: nextcord.Message, question: Optional[str] = None, **kwargs) -> bool:
response, _ = await self.reply_question(message, question, **kwargs)
s = response.strip().lower()
if s in ("1", "true", "yes", "y", "on", "oui"):
return True
elif s in ("0", "false", "no", "n", "off", "non"):
return False
else:
await self.reply_to_msg(message, "Invalid bool string. Aborting")
raise ValueError(f"Could not convert {s} to bool")
async def on_message(self, message: nextcord.Message):
self.logger.info("Processing a message", extra=common.message_info(message))
if (is_bot := message.author.bot) or not self.is_running():
self.logger.info(
f"OnMessage: Didn't read message because {'the sender was a bot' if is_bot else 'I am dead'}."
)
return
if isinstance(message.channel, nextcord.DMChannel):
self.logger.info("Processing a DM", extra=common.message_info(message))
if message.content.lower() == "start":
config.Users.fetch(message.author.id).accepts_dms = True
self.logger.info("A user now accepts to receive DMs", extra=common.message_info(message))
await self.reply_to_msg(message, "You will now receive direct messages from me again! If you change your mind, send a message that says `stop`.")
return
elif message.content.lower() == "stop":
config.Users.fetch(message.author.id).accepts_dms = False
self.logger.info("A user now refuses to receive DMs", extra=common.message_info(message))
await self.reply_to_msg(message, "You will no longer receive direct messages from me! To resume, send a message that says `start`.")
return
removed = await self.MediaOnly.process_message(message)
if not removed:
before, space, after = message.content.partition(" ")
# if the prefix is the only thing before the space then this isn't a command
if before.startswith(self.command_prefix) and len(before) > 1:
self.logger.info("Processing commands")
await self.process_commands(message)
else:
_reacted = await self.Crashes.process_message(message)
self.logger.info("Finished processing a message", extra=common.message_info(message))
async def repository_query(self, query: str):
self.logger.info(f"SMR query of length {len(query)} requested")
async with await self.web_session.post("https://api.ficsit.app/v2/query", json={"query": query}) as response:
response.raise_for_status()
self.logger.info(f"SMR query returned with response {response.status}")
value = await response.json()
self.logger.info("SMR response decoded")
return value
async def async_url_get(self, url: str, /, get: type = bytes) -> str | bytes | dict:
async with self.web_session.get(url) as response:
self.logger.info(f"Requested {get} from {url} with response {response.status}")
if get == dict:
rtn = await response.json()
else:
content = await response.read()
rtn = content.decode() if get == str else content
self.logger.info(f"Data has length of {len(rtn)}")
return rtn
async def obtain_attachment(self, url: str) -> nextcord.File:
async with self.web_session.get(url) as resp:
buff = io.BytesIO(await resp.read())
_, filename = split(urlparse(url).path)
return nextcord.File(filename=filename, fp=buff, force_close=True)