mirror of
https://github.com/osmarks/autobotrobot
synced 2025-01-04 21:00:27 +00:00
modularize it, add mostly working phone function
This commit is contained in:
parent
50e4c22565
commit
f902ca0d64
30
src/db.py
30
src/db.py
@ -21,16 +21,44 @@ CREATE TABLE reminders (
|
|||||||
expired INTEGER NOT NULL,
|
expired INTEGER NOT NULL,
|
||||||
extra TEXT NOT NULL
|
extra TEXT NOT NULL
|
||||||
);
|
);
|
||||||
|
""",
|
||||||
|
"""
|
||||||
|
CREATE TABLE telephone_config (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
guild_id INTEGER NOT NULL,
|
||||||
|
channel_id INTEGER NOT NULL UNIQUE,
|
||||||
|
webhook TEXT
|
||||||
|
);
|
||||||
|
""",
|
||||||
|
"""
|
||||||
|
CREATE TABLE calls (
|
||||||
|
from_id TEXT NOT NULL REFERENCES telephone_config(id) UNIQUE,
|
||||||
|
to_id TEXT NOT NULL REFERENCES telephone_config(id),
|
||||||
|
start_time INTEGER NOT NULL
|
||||||
|
);
|
||||||
"""
|
"""
|
||||||
]
|
]
|
||||||
|
|
||||||
|
async def execute_fetchone(self, sql, params=None):
|
||||||
|
if params == None: params = ()
|
||||||
|
return await self._execute(self._fetchone, sql, params)
|
||||||
|
|
||||||
|
def _fetchone(self, sql, params):
|
||||||
|
cursor = self._conn.execute(sql, params)
|
||||||
|
return cursor.fetchone()
|
||||||
|
|
||||||
async def init(db_path):
|
async def init(db_path):
|
||||||
db = await aiosqlite.connect(db_path)
|
db = await aiosqlite.connect(db_path)
|
||||||
|
await db.execute("PRAGMA foreign_keys = ON")
|
||||||
|
|
||||||
|
db.row_factory = aiosqlite.Row
|
||||||
|
aiosqlite.Connection._fetchone = _fetchone
|
||||||
|
aiosqlite.Connection.execute_fetchone = execute_fetchone
|
||||||
|
|
||||||
version = (await (await db.execute("PRAGMA user_version")).fetchone())[0]
|
version = (await (await db.execute("PRAGMA user_version")).fetchone())[0]
|
||||||
for i in range(version, len(migrations)):
|
for i in range(version, len(migrations)):
|
||||||
await db.executescript(migrations[i])
|
await db.executescript(migrations[i])
|
||||||
# Normally this would be a terrible idea because of SQL injection.
|
# Normally interpolating like this would be a terrible idea because of SQL injection.
|
||||||
# However, in this case there is not an obvious alternative (the parameter-based way apparently doesn't work)
|
# However, in this case there is not an obvious alternative (the parameter-based way apparently doesn't work)
|
||||||
# and i + 1 will always be an integer anyway
|
# and i + 1 will always be an integer anyway
|
||||||
await db.execute(f"PRAGMA user_version = {i + 1}")
|
await db.execute(f"PRAGMA user_version = {i + 1}")
|
||||||
|
49
src/debug.py
Normal file
49
src/debug.py
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
import util
|
||||||
|
import asyncio
|
||||||
|
import traceback
|
||||||
|
from discord.ext import commands
|
||||||
|
|
||||||
|
def setup(bot):
|
||||||
|
async def admin_check(ctx):
|
||||||
|
return await bot.is_owner(ctx.author)
|
||||||
|
|
||||||
|
@bot.group()
|
||||||
|
@commands.check(admin_check)
|
||||||
|
async def magic(ctx):
|
||||||
|
if ctx.invoked_subcommand == None:
|
||||||
|
return await ctx.send("Invalid magic command.")
|
||||||
|
|
||||||
|
@magic.command(rest_is_raw=True)
|
||||||
|
async def py(ctx, *, code):
|
||||||
|
code = util.extract_codeblock(code)
|
||||||
|
try:
|
||||||
|
loc = {
|
||||||
|
**locals(),
|
||||||
|
"bot": bot,
|
||||||
|
"ctx": ctx,
|
||||||
|
"db": bot.database
|
||||||
|
}
|
||||||
|
result = await asyncio.wait_for(util.async_exec(code, loc, globals()), timeout=5.0)
|
||||||
|
if result != None:
|
||||||
|
if isinstance(result, str):
|
||||||
|
await ctx.send(result[:1999])
|
||||||
|
else:
|
||||||
|
await ctx.send(util.gen_codeblock(repr(result)))
|
||||||
|
except TimeoutError:
|
||||||
|
await ctx.send(embed=util.error_embed("Timed out."))
|
||||||
|
except BaseException as e:
|
||||||
|
await ctx.send(embed=util.error_embed(util.gen_codeblock(traceback.format_exc())))
|
||||||
|
|
||||||
|
@magic.command(rest_is_raw=True)
|
||||||
|
async def sql(ctx, *, code):
|
||||||
|
code = util.extract_codeblock(code)
|
||||||
|
try:
|
||||||
|
csr = bot.database.execute(code)
|
||||||
|
out = ""
|
||||||
|
async with csr as cursor:
|
||||||
|
async for row in cursor:
|
||||||
|
out += " ".join(map(repr, row)) + "\n"
|
||||||
|
await ctx.send(util.gen_codeblock(out))
|
||||||
|
await bot.database.commit()
|
||||||
|
except Exception as e:
|
||||||
|
await ctx.send(embed=util.error_embed(util.gen_codeblock(traceback.format_exc())))
|
202
src/main.py
202
src/main.py
@ -11,37 +11,18 @@ import argparse
|
|||||||
import traceback
|
import traceback
|
||||||
import random
|
import random
|
||||||
import rolldice
|
import rolldice
|
||||||
from datetime import timezone, datetime
|
|
||||||
|
|
||||||
import tio
|
import tio
|
||||||
import db
|
import db
|
||||||
import util
|
import util
|
||||||
|
|
||||||
def timestamp(): return int(datetime.now(tz=timezone.utc).timestamp())
|
config = util.config
|
||||||
|
|
||||||
# TODO refactor this
|
|
||||||
database = None
|
|
||||||
|
|
||||||
config = toml.load(open("config.toml", "r"))
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(asctime)s %(message)s", datefmt="%H:%M:%S %d/%m/%Y")
|
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(asctime)s %(message)s", datefmt="%H:%M:%S %d/%m/%Y")
|
||||||
|
|
||||||
bot = commands.Bot(command_prefix=config["prefix"], description="AutoBotRobot, the most useless bot in the known universe.", case_insensitive=True)
|
bot = commands.Bot(command_prefix=config["prefix"], description="AutoBotRobot, the most useless bot in the known universe.", case_insensitive=True)
|
||||||
bot._skip_check = lambda x, y: False
|
bot._skip_check = lambda x, y: False
|
||||||
|
|
||||||
def make_embed(*, fields=[], footer_text=None, **kwargs):
|
|
||||||
embed = discord.Embed(**kwargs)
|
|
||||||
for field in fields:
|
|
||||||
if len(field) > 2:
|
|
||||||
embed.add_field(name=field[0], value=field[1], inline=field[2])
|
|
||||||
else:
|
|
||||||
embed.add_field(name=field[0], value=field[1], inline=False)
|
|
||||||
if footer_text:
|
|
||||||
embed.set_footer(text=footer_text)
|
|
||||||
return embed
|
|
||||||
|
|
||||||
def error_embed(msg, title="Error"):
|
|
||||||
return make_embed(color=config["colors"]["error"], description=msg, title=title)
|
|
||||||
|
|
||||||
cleaner = discord.ext.commands.clean_content()
|
cleaner = discord.ext.commands.clean_content()
|
||||||
def clean(ctx, text):
|
def clean(ctx, text):
|
||||||
@ -53,19 +34,20 @@ async def on_message(message):
|
|||||||
if len(words) == 10 and message.author.id == 435756251205468160:
|
if len(words) == 10 and message.author.id == 435756251205468160:
|
||||||
await message.channel.send(util.unlyric(message.content))
|
await message.channel.send(util.unlyric(message.content))
|
||||||
else:
|
else:
|
||||||
if message.author.id == bot.user.id: return
|
if message.author == bot.user or message.author.discriminator == "0000": return
|
||||||
ctx = await bot.get_context(message)
|
ctx = await bot.get_context(message)
|
||||||
if not ctx.valid: return
|
if not ctx.valid: return
|
||||||
await bot.invoke(ctx)
|
await bot.invoke(ctx)
|
||||||
|
|
||||||
@bot.event
|
@bot.event
|
||||||
async def on_command_error(ctx, err):
|
async def on_command_error(ctx, err):
|
||||||
print(ctx, err)
|
#print(ctx, err)
|
||||||
if isinstance(err, commands.CommandNotFound, commands.CheckFailure): return
|
if isinstance(err, (commands.CommandNotFound, commands.CheckFailure)): return
|
||||||
try:
|
try:
|
||||||
trace = re.sub("\n\n+", "\n", "\n".join(traceback.format_exception(err, err, err.__traceback__)))
|
trace = re.sub("\n\n+", "\n", "\n".join(traceback.format_exception(err, err, err.__traceback__)))
|
||||||
print(trace)
|
#print(trace)
|
||||||
await ctx.send(embed=error_embed(gen_codeblock(trace), title="Internal error"))
|
logging.error("command error occured (in %s)", ctx.invoked_with, exc_info=err)
|
||||||
|
await ctx.send(embed=util.error_embed(util.gen_codeblock(trace), title="Internal error"))
|
||||||
except Exception as e: print("meta-error:", e)
|
except Exception as e: print("meta-error:", e)
|
||||||
|
|
||||||
@bot.command(help="Gives you a random fortune as generated by `fortune`.")
|
@bot.command(help="Gives you a random fortune as generated by `fortune`.")
|
||||||
@ -84,13 +66,13 @@ async def ping(ctx):
|
|||||||
async def delete(ctx, *, raw_target):
|
async def delete(ctx, *, raw_target):
|
||||||
target = await clean(ctx, raw_target.strip().replace("\n", " "))
|
target = await clean(ctx, raw_target.strip().replace("\n", " "))
|
||||||
if len(target) > 256:
|
if len(target) > 256:
|
||||||
await ctx.send(embed=error_embed("Deletion target must be max 256 chars"))
|
await ctx.send(embed=util.error_embed("Deletion target must be max 256 chars"))
|
||||||
return
|
return
|
||||||
async with ctx.typing():
|
async with ctx.typing():
|
||||||
await ctx.send(f"Deleting {target}...")
|
await ctx.send(f"Deleting {target}...")
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
await database.execute("INSERT INTO deleted_items (timestamp, item) VALUES (?, ?)", (timestamp(), target))
|
await bot.database.execute("INSERT INTO deleted_items (timestamp, item) VALUES (?, ?)", (util.timestamp(), target))
|
||||||
await database.commit()
|
await bot.database.commit()
|
||||||
await ctx.send(f"Deleted {target} successfully.")
|
await ctx.send(f"Deleted {target} successfully.")
|
||||||
|
|
||||||
@bot.command(help="View recently deleted things, optionally matching a filter.")
|
@bot.command(help="View recently deleted things, optionally matching a filter.")
|
||||||
@ -99,9 +81,9 @@ async def list_deleted(ctx, search=None):
|
|||||||
if search: acc = f"Recently deleted (matching {search}):\n"
|
if search: acc = f"Recently deleted (matching {search}):\n"
|
||||||
csr = None
|
csr = None
|
||||||
if search:
|
if search:
|
||||||
csr = database.execute("SELECT * FROM deleted_items WHERE item LIKE ? ORDER BY timestamp DESC LIMIT 100", (f"%{search}%",))
|
csr = bot.database.execute("SELECT * FROM deleted_items WHERE item LIKE ? ORDER BY timestamp DESC LIMIT 100", (f"%{search}%",))
|
||||||
else:
|
else:
|
||||||
csr = database.execute("SELECT * FROM deleted_items ORDER BY timestamp DESC LIMIT 100")
|
csr = bot.database.execute("SELECT * FROM deleted_items ORDER BY timestamp DESC LIMIT 100")
|
||||||
async with csr as cursor:
|
async with csr as cursor:
|
||||||
async for row in cursor:
|
async for row in cursor:
|
||||||
to_add = "- " + row[2].replace("```", "[REDACTED]") + "\n"
|
to_add = "- " + row[2].replace("```", "[REDACTED]") + "\n"
|
||||||
@ -124,20 +106,17 @@ exec_flag_parser = NonExitingArgumentParser(add_help=False)
|
|||||||
exec_flag_parser.add_argument("--verbose", "-v", action="store_true")
|
exec_flag_parser.add_argument("--verbose", "-v", action="store_true")
|
||||||
exec_flag_parser.add_argument("--language", "-L")
|
exec_flag_parser.add_argument("--language", "-L")
|
||||||
|
|
||||||
def gen_codeblock(content):
|
|
||||||
return "```\n" + content.replace("```", "\\`\\`\\`")[:1900] + "\n```"
|
|
||||||
|
|
||||||
@bot.command(rest_is_raw=True, help="Execute provided code (in a codeblock) using TIO.run.")
|
@bot.command(rest_is_raw=True, help="Execute provided code (in a codeblock) using TIO.run.")
|
||||||
async def exec(ctx, *, arg):
|
async def exec(ctx, *, arg):
|
||||||
match = re.match(EXEC_REGEX, arg, flags=re.DOTALL)
|
match = re.match(EXEC_REGEX, arg, flags=re.DOTALL)
|
||||||
if match == None:
|
if match == None:
|
||||||
await ctx.send(embed=error_embed("Invalid format. Expected a codeblock."))
|
await ctx.send(embed=util.error_embed("Invalid format. Expected a codeblock."))
|
||||||
return
|
return
|
||||||
flags_raw = match.group(1)
|
flags_raw = match.group(1)
|
||||||
flags = exec_flag_parser.parse_args(flags_raw.split())
|
flags = exec_flag_parser.parse_args(flags_raw.split())
|
||||||
lang = flags.language or match.group(2)
|
lang = flags.language or match.group(2)
|
||||||
if not lang:
|
if not lang:
|
||||||
await ctx.send(embed=error_embed("No language specified. Use the -L flag or add a language to your codeblock."))
|
await ctx.send(embed=util.error_embed("No language specified. Use the -L flag or add a language to your codeblock."))
|
||||||
return
|
return
|
||||||
lang = lang.strip()
|
lang = lang.strip()
|
||||||
code = match.group(3)
|
code = match.group(3)
|
||||||
@ -145,11 +124,11 @@ async def exec(ctx, *, arg):
|
|||||||
async with ctx.typing():
|
async with ctx.typing():
|
||||||
ok, real_lang, result, debug = await tio.run(lang, code)
|
ok, real_lang, result, debug = await tio.run(lang, code)
|
||||||
if not ok:
|
if not ok:
|
||||||
await ctx.send(embed=error_embed(gen_codeblock(result), "Execution failed"))
|
await ctx.send(embed=util.error_embed(util.gen_codeblock(result), "Execution failed"))
|
||||||
else:
|
else:
|
||||||
out = result
|
out = result
|
||||||
if flags.verbose:
|
if flags.verbose:
|
||||||
debug_block = "\n" + gen_codeblock(f"""{debug}\nLanguage: {real_lang}""")
|
debug_block = "\n" + util.gen_codeblock(f"""{debug}\nLanguage: {real_lang}""")
|
||||||
out = out[:2000 - len(debug_block)] + debug_block
|
out = out[:2000 - len(debug_block)] + debug_block
|
||||||
else:
|
else:
|
||||||
out = out[:2000]
|
out = out[:2000]
|
||||||
@ -167,93 +146,6 @@ async def supported_langs(ctx, search=None):
|
|||||||
if acc == "": acc = "No results."
|
if acc == "": acc = "No results."
|
||||||
await ctx.send(acc)
|
await ctx.send(acc)
|
||||||
|
|
||||||
@bot.command(brief="Set a reminder to be reminded about later.", rest_is_raw=True, help="""Sets a reminder which you will (probably) be reminded about at/after the specified time.
|
|
||||||
All times are UTC.
|
|
||||||
Reminders are checked every minute, so while precise times are not guaranteed, reminders should under normal conditions be received within 2 minutes of what you specify.""")
|
|
||||||
async def remind(ctx, time, *, reminder):
|
|
||||||
reminder = reminder.strip()
|
|
||||||
if len(reminder) > 512:
|
|
||||||
await ctx.send(embed=error_embed("Maximum reminder length is 512 characters", "Foolish user error"))
|
|
||||||
return
|
|
||||||
extra_data = {
|
|
||||||
"author_id": ctx.author.id,
|
|
||||||
"channel_id": ctx.message.channel.id,
|
|
||||||
"message_id": ctx.message.id,
|
|
||||||
"guild_id": ctx.message.guild and ctx.message.guild.id,
|
|
||||||
"original_time_spec": time
|
|
||||||
}
|
|
||||||
try:
|
|
||||||
time = util.parse_time(time)
|
|
||||||
except:
|
|
||||||
await ctx.send(embed=error_embed("Invalid time"))
|
|
||||||
return
|
|
||||||
await database.execute("INSERT INTO reminders (remind_timestamp, created_timestamp, reminder, expired, extra) VALUES (?, ?, ?, ?, ?)",
|
|
||||||
(time.timestamp(), timestamp(), reminder, 0, json.dumps(extra_data, separators=(',', ':'))))
|
|
||||||
await database.commit()
|
|
||||||
await ctx.send(f"Reminder scheduled for {util.format_time(time)}.")
|
|
||||||
|
|
||||||
async def send_to_channel(info, text):
|
|
||||||
channel = bot.get_channel(info["channel_id"])
|
|
||||||
if not channel: raise Exception(f"channel {info['channel_id']} unavailable/nonexistent")
|
|
||||||
await channel.send(text)
|
|
||||||
|
|
||||||
async def send_by_dm(info, text):
|
|
||||||
user = bot.get_user(info["author_id"])
|
|
||||||
if not user: raise Exception(f"user {info['author_id']} unavailable/nonexistent")
|
|
||||||
if not user.dm_channel: await user.create_dm()
|
|
||||||
await user.dm_channel.send(text)
|
|
||||||
|
|
||||||
async def send_to_guild(info, text):
|
|
||||||
guild = bot.get_guild(info["guild_id"])
|
|
||||||
member = guild.get_member(info["author_id"])
|
|
||||||
self = guild.get_member(bot.user.id)
|
|
||||||
# if member is here, find a channel they can read and the bot can send in
|
|
||||||
if member:
|
|
||||||
for chan in guild.text_channels:
|
|
||||||
if chan.permissions_for(member).read_messages and chan.permissions_for(self).send_messages:
|
|
||||||
await chan.send(text)
|
|
||||||
return
|
|
||||||
# if member not here or no channel they can read messages in, send to any available channel
|
|
||||||
for chan in guild.text_channels:
|
|
||||||
if chan.permissions_for(self).send_messages:
|
|
||||||
await chan.send(text)
|
|
||||||
return
|
|
||||||
raise Exception(f"guild {info['author_id']} has no (valid) channels")
|
|
||||||
|
|
||||||
remind_send_methods = [
|
|
||||||
("original channel", send_to_channel),
|
|
||||||
("direct message", send_by_dm),
|
|
||||||
("originating guild", send_to_guild)
|
|
||||||
]
|
|
||||||
|
|
||||||
@tasks.loop(seconds=60)
|
|
||||||
async def remind_worker():
|
|
||||||
csr = database.execute("SELECT * FROM reminders WHERE expired = 0 AND remind_timestamp < ?", (timestamp(),))
|
|
||||||
to_expire = []
|
|
||||||
async with csr as cursor:
|
|
||||||
async for row in cursor:
|
|
||||||
rid, remind_timestamp, created_timestamp, reminder_text, _, extra = row
|
|
||||||
try:
|
|
||||||
remind_timestamp = datetime.utcfromtimestamp(remind_timestamp)
|
|
||||||
created_timestamp = datetime.utcfromtimestamp(created_timestamp)
|
|
||||||
extra = json.loads(extra)
|
|
||||||
uid = extra["author_id"]
|
|
||||||
text = f"<@{uid}> Reminder queued at {util.format_time(created_timestamp)}: {reminder_text}"
|
|
||||||
|
|
||||||
for method_name, func in remind_send_methods:
|
|
||||||
print("trying", method_name, rid)
|
|
||||||
try:
|
|
||||||
await func(extra, text)
|
|
||||||
to_expire.append(rid)
|
|
||||||
break
|
|
||||||
except Exception as e: logging.warning("failed to send %d to %s", rid, method_name, exc_info=e)
|
|
||||||
except Exception as e:
|
|
||||||
logging.warning("Could not send reminder %d", rid, exc_info=e)
|
|
||||||
for expiry_id in to_expire:
|
|
||||||
logging.info("Expiring reminder %d", expiry_id)
|
|
||||||
await database.execute("UPDATE reminders SET expired = 1 WHERE id = ?", (expiry_id,))
|
|
||||||
await database.commit()
|
|
||||||
|
|
||||||
@bot.command(help="Get some information about the bot.")
|
@bot.command(help="Get some information about the bot.")
|
||||||
async def about(ctx):
|
async def about(ctx):
|
||||||
await ctx.send("""**AutoBotRobot: The least useful Discord bot ever designed.**
|
await ctx.send("""**AutoBotRobot: The least useful Discord bot ever designed.**
|
||||||
@ -304,68 +196,23 @@ async def random_choice(ctx, *choices):
|
|||||||
for choice in choices:
|
for choice in choices:
|
||||||
counts[choice] = counts.get(choice, 0) + 1
|
counts[choice] = counts.get(choice, 0) + 1
|
||||||
await ctx.send("\n".join(map(lambda x: f"{x[0]} x{x[1]}", counts.items())))
|
await ctx.send("\n".join(map(lambda x: f"{x[0]} x{x[1]}", counts.items())))
|
||||||
|
|
||||||
async def admin_check(ctx):
|
|
||||||
if not await bot.is_owner(ctx.author):
|
|
||||||
# apparently this has to be a pure function because ++help calls it for some reason because of course
|
|
||||||
#await ctx.send(embed=error_embed(f"{ctx.author.name} is not in the sudoers file. This incident has been reported."))
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
@bot.check
|
@bot.check
|
||||||
async def andrew_bad(ctx):
|
async def andrew_bad(ctx):
|
||||||
return ctx.message.author.id != 543131534685765673
|
return ctx.message.author.id != 543131534685765673
|
||||||
|
|
||||||
@bot.group()
|
|
||||||
@commands.check(admin_check)
|
|
||||||
async def magic(ctx):
|
|
||||||
if ctx.invoked_subcommand == None:
|
|
||||||
return await ctx.send("Invalid magic command.")
|
|
||||||
|
|
||||||
@magic.command(rest_is_raw=True)
|
|
||||||
async def py(ctx, *, code):
|
|
||||||
code = util.extract_codeblock(code)
|
|
||||||
try:
|
|
||||||
loc = {
|
|
||||||
**locals(),
|
|
||||||
"bot": bot,
|
|
||||||
"ctx": ctx,
|
|
||||||
"db": database
|
|
||||||
}
|
|
||||||
result = await asyncio.wait_for(util.async_exec(code, loc, globals()), timeout=5.0)
|
|
||||||
if result != None:
|
|
||||||
if isinstance(result, str):
|
|
||||||
await ctx.send(result[:1999])
|
|
||||||
else:
|
|
||||||
await ctx.send(gen_codeblock(repr(result)))
|
|
||||||
except TimeoutError:
|
|
||||||
await ctx.send(embed=error_embed("Timed out."))
|
|
||||||
except BaseException as e:
|
|
||||||
await ctx.send(embed=error_embed(gen_codeblock(traceback.format_exc())))
|
|
||||||
|
|
||||||
@magic.command(rest_is_raw=True)
|
|
||||||
async def sql(ctx, *, code):
|
|
||||||
code = util.extract_codeblock(code)
|
|
||||||
try:
|
|
||||||
csr = database.execute(code)
|
|
||||||
out = ""
|
|
||||||
async with csr as cursor:
|
|
||||||
async for row in cursor:
|
|
||||||
out += " ".join(map(repr, row)) + "\n"
|
|
||||||
await ctx.send(gen_codeblock(out))
|
|
||||||
await database.commit()
|
|
||||||
except Exception as e:
|
|
||||||
await ctx.send(embed=error_embed(gen_codeblock(traceback.format_exc())))
|
|
||||||
|
|
||||||
@bot.event
|
@bot.event
|
||||||
async def on_ready():
|
async def on_ready():
|
||||||
logging.info("Connected as " + bot.user.name)
|
logging.info("Connected as " + bot.user.name)
|
||||||
await bot.change_presence(status=discord.Status.online, activity=discord.Activity(type=discord.ActivityType.listening, name=f"commands beginning with {config['prefix']}"))
|
await bot.change_presence(status=discord.Status.online, activity=discord.Activity(type=discord.ActivityType.listening, name=f"commands beginning with {bot.command_prefix}"))
|
||||||
remind_worker.start()
|
|
||||||
|
|
||||||
async def run_bot():
|
async def run_bot():
|
||||||
global database
|
bot.database = await db.init(config["database"])
|
||||||
database = await db.init(config["database"])
|
for ext in (
|
||||||
|
"reminders",
|
||||||
|
"debug",
|
||||||
|
"telephone"
|
||||||
|
):
|
||||||
|
bot.load_extension(ext)
|
||||||
await bot.start(config["token"])
|
await bot.start(config["token"])
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
@ -373,7 +220,6 @@ if __name__ == '__main__':
|
|||||||
try:
|
try:
|
||||||
loop.run_until_complete(run_bot())
|
loop.run_until_complete(run_bot())
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
remind_worker.cancel()
|
|
||||||
loop.run_until_complete(bot.logout())
|
loop.run_until_complete(bot.logout())
|
||||||
finally:
|
finally:
|
||||||
loop.close()
|
loop.close()
|
||||||
|
100
src/reminders.py
Normal file
100
src/reminders.py
Normal file
@ -0,0 +1,100 @@
|
|||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from datetime import datetime
|
||||||
|
import discord.ext.tasks as tasks
|
||||||
|
|
||||||
|
import util
|
||||||
|
|
||||||
|
def setup(bot):
|
||||||
|
@bot.command(brief="Set a reminder to be reminded about later.", rest_is_raw=True, help="""Sets a reminder which you will (probably) be reminded about at/after the specified time.
|
||||||
|
All times are UTC.
|
||||||
|
Reminders are checked every minute, so while precise times are not guaranteed, reminders should under normal conditions be received within 2 minutes of what you specify.""")
|
||||||
|
async def remind(ctx, time, *, reminder):
|
||||||
|
reminder = reminder.strip()
|
||||||
|
if len(reminder) > 512:
|
||||||
|
await ctx.send(embed=util.error_embed("Maximum reminder length is 512 characters", "Foolish user error"))
|
||||||
|
return
|
||||||
|
extra_data = {
|
||||||
|
"author_id": ctx.author.id,
|
||||||
|
"channel_id": ctx.message.channel.id,
|
||||||
|
"message_id": ctx.message.id,
|
||||||
|
"guild_id": ctx.message.guild and ctx.message.guild.id,
|
||||||
|
"original_time_spec": time
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
time = util.parse_time(time)
|
||||||
|
except:
|
||||||
|
await ctx.send(embed=util.error_embed("Invalid time"))
|
||||||
|
return
|
||||||
|
await bot.database.execute("INSERT INTO reminders (remind_timestamp, created_timestamp, reminder, expired, extra) VALUES (?, ?, ?, ?, ?)",
|
||||||
|
(time.timestamp(), util.timestamp(), reminder, 0, util.json_encode(extra_data)))
|
||||||
|
await bot.database.commit()
|
||||||
|
await ctx.send(f"Reminder scheduled for {util.format_time(time)}.")
|
||||||
|
|
||||||
|
async def send_to_channel(info, text):
|
||||||
|
channel = bot.get_channel(info["channel_id"])
|
||||||
|
if not channel: raise Exception(f"channel {info['channel_id']} unavailable/nonexistent")
|
||||||
|
await channel.send(text)
|
||||||
|
|
||||||
|
async def send_by_dm(info, text):
|
||||||
|
user = bot.get_user(info["author_id"])
|
||||||
|
if not user: raise Exception(f"user {info['author_id']} unavailable/nonexistent")
|
||||||
|
if not user.dm_channel: await user.create_dm()
|
||||||
|
await user.dm_channel.send(text)
|
||||||
|
|
||||||
|
async def send_to_guild(info, text):
|
||||||
|
guild = bot.get_guild(info["guild_id"])
|
||||||
|
member = guild.get_member(info["author_id"])
|
||||||
|
self = guild.get_member(bot.user.id)
|
||||||
|
# if member is here, find a channel they can read and the bot can send in
|
||||||
|
if member:
|
||||||
|
for chan in guild.text_channels:
|
||||||
|
if chan.permissions_for(member).read_messages and chan.permissions_for(self).send_messages:
|
||||||
|
await chan.send(text)
|
||||||
|
return
|
||||||
|
# if member not here or no channel they can read messages in, send to any available channel
|
||||||
|
for chan in guild.text_channels:
|
||||||
|
if chan.permissions_for(self).send_messages:
|
||||||
|
await chan.send(text)
|
||||||
|
return
|
||||||
|
raise Exception(f"guild {info['author_id']} has no (valid) channels")
|
||||||
|
|
||||||
|
remind_send_methods = [
|
||||||
|
("original channel", send_to_channel),
|
||||||
|
("direct message", send_by_dm),
|
||||||
|
("originating guild", send_to_guild)
|
||||||
|
]
|
||||||
|
|
||||||
|
@tasks.loop(seconds=60)
|
||||||
|
async def remind_worker():
|
||||||
|
csr = bot.database.execute("SELECT * FROM reminders WHERE expired = 0 AND remind_timestamp < ?", (util.timestamp(),))
|
||||||
|
to_expire = []
|
||||||
|
async with csr as cursor:
|
||||||
|
async for row in cursor:
|
||||||
|
rid, remind_timestamp, created_timestamp, reminder_text, _, extra = row
|
||||||
|
try:
|
||||||
|
remind_timestamp = datetime.utcfromtimestamp(remind_timestamp)
|
||||||
|
created_timestamp = datetime.utcfromtimestamp(created_timestamp)
|
||||||
|
extra = json.loads(extra)
|
||||||
|
uid = extra["author_id"]
|
||||||
|
text = f"<@{uid}> Reminder queued at {util.format_time(created_timestamp)}: {reminder_text}"
|
||||||
|
|
||||||
|
for method_name, func in remind_send_methods:
|
||||||
|
print("trying", method_name, rid)
|
||||||
|
try:
|
||||||
|
await func(extra, text)
|
||||||
|
to_expire.append(rid)
|
||||||
|
break
|
||||||
|
except Exception as e: logging.warning("failed to send %d to %s", rid, method_name, exc_info=e)
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning("Could not send reminder %d", rid, exc_info=e)
|
||||||
|
for expiry_id in to_expire:
|
||||||
|
logging.info("Expiring reminder %d", expiry_id)
|
||||||
|
await bot.database.execute("UPDATE reminders SET expired = 1 WHERE id = ?", (expiry_id,))
|
||||||
|
await bot.database.commit()
|
||||||
|
|
||||||
|
remind_worker.start()
|
||||||
|
bot.remind_worker = remind_worker
|
||||||
|
|
||||||
|
def teardown(bot):
|
||||||
|
bot.remind_worker.cancel()
|
197
src/telephone.py
Normal file
197
src/telephone.py
Normal file
@ -0,0 +1,197 @@
|
|||||||
|
from discord.ext import commands
|
||||||
|
import discord
|
||||||
|
import logging
|
||||||
|
import asyncio
|
||||||
|
import hashlib
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
import util
|
||||||
|
|
||||||
|
# Generate a "phone" address
|
||||||
|
# Not actually for phones
|
||||||
|
def generate_address(ctx):
|
||||||
|
h = hashlib.blake2b(str(ctx.guild.id).encode("utf-8")).digest()
|
||||||
|
words = open("wordlist-8192.txt").readlines()
|
||||||
|
out = ""
|
||||||
|
for i in range(3):
|
||||||
|
out += words[int.from_bytes(h[i * 2:i * 2 + 3], "little") % 8192].strip().title()
|
||||||
|
return out
|
||||||
|
|
||||||
|
channel_calls_cache = {}
|
||||||
|
|
||||||
|
def setup(bot):
|
||||||
|
async def server_mod_check(ctx):
|
||||||
|
return ctx.author.permissions_in(ctx.channel).manage_channels or bot.is_owner(ctx.author)
|
||||||
|
|
||||||
|
@bot.group(name="apiotelephone", aliases=["tel", "tele", "telephone", "apiotel"], brief="ApioTelephone lets you 'call' other servers.", help=f"""
|
||||||
|
Call other (participating) servers with ApioTelephone! To start, do {bot.command_prefix} tel setup (requires Manage Channels).
|
||||||
|
It's recommended that you give the bot Manage Webhooks permissions in this channel so that it can use webhook calls mode.
|
||||||
|
|
||||||
|
""")
|
||||||
|
async def telephone(ctx): pass
|
||||||
|
|
||||||
|
async def get_channel_config(channel):
|
||||||
|
return await bot.database.execute_fetchone("SELECT * FROM telephone_config WHERE channel_id = ?", (channel,))
|
||||||
|
|
||||||
|
async def get_addr_config(addr):
|
||||||
|
return await bot.database.execute_fetchone("SELECT * FROM telephone_config WHERE id = ?", (addr,))
|
||||||
|
|
||||||
|
def cache_call(chan, other, other_wh):
|
||||||
|
x = channel_calls_cache.get(chan)
|
||||||
|
if not x:
|
||||||
|
x = []
|
||||||
|
channel_calls_cache[chan] = x
|
||||||
|
# append other end and associated webhook
|
||||||
|
x.append((other, other_wh))
|
||||||
|
|
||||||
|
def uncache_call(chan, other):
|
||||||
|
# remove other end
|
||||||
|
l = channel_calls_cache[chan]
|
||||||
|
for i, (c, _) in enumerate(l):
|
||||||
|
if c == other: l.pop(i)
|
||||||
|
|
||||||
|
async def populate_cache(db):
|
||||||
|
for row in await db.execute_fetchall("""SELECT tcf.channel_id AS from_channel, tct.channel_id AS to_channel, tcf.webhook AS from_webhook, tct.webhook AS to_webhook FROM calls
|
||||||
|
JOIN telephone_config AS tcf ON tcf.id = calls.from_id JOIN telephone_config AS tct ON tct.id = calls.to_id"""):
|
||||||
|
cache_call(row["from_channel"], row["to_channel"], row["to_webhook"])
|
||||||
|
cache_call(row["to_channel"], row["from_channel"], row["from_webhook"])
|
||||||
|
|
||||||
|
bot.loop.create_task(populate_cache(bot.database))
|
||||||
|
|
||||||
|
@bot.listen("on_message")
|
||||||
|
async def forward_call_messages(message):
|
||||||
|
calls = channel_calls_cache.get(message.channel.id, None)
|
||||||
|
if not calls: return
|
||||||
|
if (message.author.discriminator == "0000" and message.author.bot) or message.author == bot.user or message.content == "": # check if webhook, from itself, or only has embeds
|
||||||
|
return
|
||||||
|
async def send_to(call):
|
||||||
|
other_channel, other_webhook = call
|
||||||
|
if other_webhook:
|
||||||
|
await discord.Webhook.from_url(other_webhook, adapter=discord.AsyncWebhookAdapter(bot.http._HTTPClient__session)).send(
|
||||||
|
content=message.content, username=message.author.name, avatar_url=message.author.avatar_url,
|
||||||
|
allowed_mentions=discord.AllowedMentions(everyone=False))
|
||||||
|
else:
|
||||||
|
m = f"**{message.author.name}**: "
|
||||||
|
m += message.content[:2000 - len(m)]
|
||||||
|
await bot.get_channel(other_channel).send(m)
|
||||||
|
await asyncio.gather(*map(send_to, calls))
|
||||||
|
|
||||||
|
@telephone.command()
|
||||||
|
@commands.check(server_mod_check)
|
||||||
|
async def setup(ctx):
|
||||||
|
await ctx.send("Configuring...")
|
||||||
|
num = generate_address(ctx)
|
||||||
|
await ctx.send(f"Your address is {num}.")
|
||||||
|
info = await get_addr_config(num)
|
||||||
|
webhook = None
|
||||||
|
if info: webhook = info["webhook"]
|
||||||
|
if not info or not webhook:
|
||||||
|
try:
|
||||||
|
webhook = (await ctx.channel.create_webhook(name="incoming message display", reason="configure for apiotelephone")).url
|
||||||
|
await ctx.send("Created webhook.")
|
||||||
|
except discord.Forbidden as f:
|
||||||
|
logging.warn("could not create webhook in #%s %s", ctx.channel.name, ctx.guild.name, exc_info=f)
|
||||||
|
await ctx.send("Webhook creation failed - please ensure permissions are available. This is not necessary but is recommended.")
|
||||||
|
await bot.database.execute("INSERT OR REPLACE INTO telephone_config VALUES (?, ?, ?, ?)", (num, ctx.guild.id, ctx.channel.id, webhook))
|
||||||
|
await bot.database.commit()
|
||||||
|
await ctx.send("Configured.")
|
||||||
|
|
||||||
|
@telephone.command(aliases=["call"])
|
||||||
|
async def dial(ctx, address):
|
||||||
|
# basic checks - ensure this is a phone channel and has no other open calls
|
||||||
|
channel_info = await get_channel_config(ctx.channel.id)
|
||||||
|
if not channel_info: return await ctx.send(embed=util.error_embed("Not in a phone channel."))
|
||||||
|
originating_address = channel_info["id"]
|
||||||
|
if address == originating_address: return await ctx.send(embed=util.error_embed("A channel cannot dial itself. That means *you*, Gibson."))
|
||||||
|
recv_info = await get_addr_config(address)
|
||||||
|
if not recv_info: return await ctx.send(embed=util.error_embed("Destination address not found. Please check for typos and/or antimemes."))
|
||||||
|
|
||||||
|
current_call = await bot.database.execute_fetchone("SELECT * FROM calls WHERE from_id = ?", (originating_address,))
|
||||||
|
if current_call: return await ctx.send(embed=util.error_embed(f"A call is already open (to {current_call['to_id']}) from this channel. Currently, only one outgoing call is permitted at a time."))
|
||||||
|
|
||||||
|
# post embed in the receiving channel prompting people to accept/decline call
|
||||||
|
recv_channel = bot.get_channel(recv_info["channel_id"])
|
||||||
|
_, call_message = await asyncio.gather(
|
||||||
|
ctx.send(embed=util.info_embed("Outgoing call", f"Dialing {address}...")),
|
||||||
|
recv_channel.send(embed=util.info_embed("Incoming call",
|
||||||
|
f"Call from {originating_address}. Click :white_check_mark: to accept or :negative_squared_cross_mark: to decline."))
|
||||||
|
)
|
||||||
|
# add clickable reactions to it
|
||||||
|
await asyncio.gather(
|
||||||
|
call_message.add_reaction("✅"),
|
||||||
|
call_message.add_reaction("❎")
|
||||||
|
)
|
||||||
|
|
||||||
|
def check(re, u): return (str(re.emoji) == "✅" or str(re.emoji) == "❎") and u != bot.user
|
||||||
|
|
||||||
|
# wait until someone clicks the reactions, or time out and say so
|
||||||
|
try:
|
||||||
|
reaction, user = await bot.wait_for("reaction_add", timeout=util.config["call_timeout"], check=check)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
await asyncio.gather(
|
||||||
|
ctx.send(embed=util.error_embed("Timed out", "Outgoing call timed out - the other end did not pick up.")),
|
||||||
|
recv_channel.send(embed=util.error_embed("Timed out", "Call timed out - no response in time"))
|
||||||
|
)
|
||||||
|
|
||||||
|
await asyncio.gather(call_message.remove_reaction("✅", bot.user), call_message.remove_reaction("❎", bot.user))
|
||||||
|
em = str(reaction.emoji)
|
||||||
|
if em == "✅": # accept call
|
||||||
|
await bot.database.execute("INSERT INTO calls VALUES (?, ?, ?)", (originating_address, address, util.timestamp()))
|
||||||
|
await bot.database.commit()
|
||||||
|
await asyncio.gather(
|
||||||
|
ctx.send(embed=util.info_embed("Outgoing call", "Call accepted and connected.")),
|
||||||
|
recv_channel.send(embed=util.info_embed("Incoming call", "Call accepted and connected."))
|
||||||
|
)
|
||||||
|
cache_call(ctx.channel.id, recv_channel.id, recv_info["webhook"])
|
||||||
|
cache_call(recv_channel.id, ctx.channel.id, channel_info["webhook"])
|
||||||
|
elif em == "❎": # drop call
|
||||||
|
await ctx.send(embed=util.error_embed("Your call was declined.", "Call declined"))
|
||||||
|
|
||||||
|
async def get_calls(addr):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@telephone.command(aliases=["disconnect", "quit"])
|
||||||
|
async def hangup(ctx):
|
||||||
|
channel_info = await get_channel_config(ctx.channel.id)
|
||||||
|
addr = channel_info["id"]
|
||||||
|
if not channel_info: return await ctx.send(embed=util.error_embed("Not in a phone channel."))
|
||||||
|
from_here = await bot.database.execute_fetchone("SELECT * FROM calls WHERE from_id = ?", (addr,))
|
||||||
|
to_here = await bot.database.execute_fetchone("SELECT * FROM calls WHERE to_id = ?", (addr,))
|
||||||
|
if (not to_here) and (not from_here): return await ctx.send(embed=util.error_embed("No calls are active."))
|
||||||
|
|
||||||
|
other = None
|
||||||
|
if from_here:
|
||||||
|
other = from_here["to_id"]
|
||||||
|
await bot.database.execute("DELETE FROM calls WHERE from_id = ? AND to_id = ?", (addr, other))
|
||||||
|
elif to_here:
|
||||||
|
other = to_here["from_id"]
|
||||||
|
await bot.database.execute("DELETE FROM calls WHERE to_id = ? AND from_id = ?", (addr, other))
|
||||||
|
await bot.database.commit()
|
||||||
|
other_channel = (await get_addr_config(other))["channel_id"]
|
||||||
|
|
||||||
|
uncache_call(ctx.channel.id, other_channel)
|
||||||
|
uncache_call(other_channel, ctx.channel.id)
|
||||||
|
|
||||||
|
await asyncio.gather(
|
||||||
|
ctx.send(embed=util.info_embed("Hung up", f"Call to {other} disconnected.")),
|
||||||
|
bot.get_channel(other_channel).send(embed=util.info_embed("Hung up", f"Call to {addr} disconnected."))
|
||||||
|
)
|
||||||
|
|
||||||
|
@telephone.command(aliases=["status"])
|
||||||
|
async def info(ctx):
|
||||||
|
channel_info = await get_channel_config(ctx.channel.id)
|
||||||
|
if not channel_info: return await ctx.send(embed=util.info_embed("Phone status", "Not a phone channel"))
|
||||||
|
addr = channel_info['id']
|
||||||
|
title = f"{addr} status"
|
||||||
|
|
||||||
|
fields = []
|
||||||
|
|
||||||
|
now = datetime.utcnow()
|
||||||
|
def delta(ts):
|
||||||
|
return util.format_timedelta(datetime.utcfromtimestamp(ts), now)
|
||||||
|
|
||||||
|
incoming = await bot.database.execute_fetchall("SELECT * FROM calls WHERE to_id = ?", (addr,))
|
||||||
|
fields.extend(map(lambda x: ["Incoming call", f"From {x['from_id']} - for {delta(x['start_time'])}"], incoming))
|
||||||
|
outgoing = await bot.database.execute_fetchall("SELECT * FROM calls WHERE from_id = ?", (addr,))
|
||||||
|
fields.extend(map(lambda x: ["Outgoing call", f"To {x['from_id']} - for {delta(x['start_time'])}"], outgoing))
|
||||||
|
await ctx.send(embed=util.info_embed(title, f"Connected: {len(incoming) + len(outgoing)}", fields))
|
46
src/util.py
46
src/util.py
@ -5,6 +5,13 @@ import ast
|
|||||||
import copy
|
import copy
|
||||||
import random
|
import random
|
||||||
from dateutil.relativedelta import relativedelta
|
from dateutil.relativedelta import relativedelta
|
||||||
|
import json
|
||||||
|
import discord
|
||||||
|
import toml
|
||||||
|
|
||||||
|
config = toml.load(open("config.toml", "r"))
|
||||||
|
|
||||||
|
def timestamp(): return int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp())
|
||||||
|
|
||||||
# from here: https://github.com/Rapptz/RoboDanny/blob/18b92ae2f53927aedebc25fb5eca02c8f6d7a874/cogs/utils/time.py
|
# from here: https://github.com/Rapptz/RoboDanny/blob/18b92ae2f53927aedebc25fb5eca02c8f6d7a874/cogs/utils/time.py
|
||||||
short_timedelta_regex = re.compile("""
|
short_timedelta_regex = re.compile("""
|
||||||
@ -20,11 +27,11 @@ def parse_short_timedelta(text):
|
|||||||
match = short_timedelta_regex.fullmatch(text)
|
match = short_timedelta_regex.fullmatch(text)
|
||||||
if match is None or not match.group(0): raise ValueError("parse failed")
|
if match is None or not match.group(0): raise ValueError("parse failed")
|
||||||
data = { k: int(v) for k, v in match.groupdict(default=0).items() }
|
data = { k: int(v) for k, v in match.groupdict(default=0).items() }
|
||||||
return datetime.datetime.utcnow() + relativedelta(**data)
|
return datetime.datetime.now(tz=datetime.timezone.utc) + relativedelta(**data)
|
||||||
|
|
||||||
cal = parsedatetime.Calendar()
|
cal = parsedatetime.Calendar()
|
||||||
def parse_humantime(text):
|
def parse_humantime(text):
|
||||||
time_struct, parse_status = cal.parse(text)
|
time_struct, parse_status = cal.nlp(text)
|
||||||
if parse_status == 1: return datetime.datetime(*time_struct[:6])
|
if parse_status == 1: return datetime.datetime(*time_struct[:6])
|
||||||
else: raise ValueError("parse failed")
|
else: raise ValueError("parse failed")
|
||||||
|
|
||||||
@ -40,6 +47,22 @@ def parse_time(text):
|
|||||||
def format_time(dt):
|
def format_time(dt):
|
||||||
return dt.strftime("%H:%M:%S %d/%m/%Y")
|
return dt.strftime("%H:%M:%S %d/%m/%Y")
|
||||||
|
|
||||||
|
timeparts = (
|
||||||
|
("y", "years"),
|
||||||
|
("mo", "months"),
|
||||||
|
("d", "days"),
|
||||||
|
("m", "minutes"),
|
||||||
|
("s", "seconds")
|
||||||
|
)
|
||||||
|
|
||||||
|
def format_timedelta(from_, to):
|
||||||
|
d = relativedelta(to, from_)
|
||||||
|
out = "0s" if d.seconds == 0 else ""
|
||||||
|
for short, attr in timeparts:
|
||||||
|
x = getattr(d, attr)
|
||||||
|
if x != 0: out += str(x) + short
|
||||||
|
return out
|
||||||
|
|
||||||
CODEBLOCK_REGEX = "^[^`]*```[a-zA-Z0-9_\-+]*\n(.+)```$"
|
CODEBLOCK_REGEX = "^[^`]*```[a-zA-Z0-9_\-+]*\n(.+)```$"
|
||||||
CODELINE_REGEX = "^[^`]*`(.*)`$"
|
CODELINE_REGEX = "^[^`]*`(.*)`$"
|
||||||
def extract_codeblock(s):
|
def extract_codeblock(s):
|
||||||
@ -72,6 +95,20 @@ async def async_exec(code, loc, glob):
|
|||||||
exec(compile(wrapper, "<repl>", "exec"), loc, glob)
|
exec(compile(wrapper, "<repl>", "exec"), loc, glob)
|
||||||
return await (loc.get("repl_coroutine") or glob.get("repl_coroutine"))()
|
return await (loc.get("repl_coroutine") or glob.get("repl_coroutine"))()
|
||||||
|
|
||||||
|
def make_embed(*, fields=(), footer_text=None, **kwargs):
|
||||||
|
embed = discord.Embed(**kwargs)
|
||||||
|
for field in fields:
|
||||||
|
if len(field) > 2:
|
||||||
|
embed.add_field(name=field[0], value=field[1], inline=field[2])
|
||||||
|
else:
|
||||||
|
embed.add_field(name=field[0], value=field[1], inline=False)
|
||||||
|
if footer_text:
|
||||||
|
embed.set_footer(text=footer_text)
|
||||||
|
return embed
|
||||||
|
|
||||||
|
def error_embed(msg, title="Error"): return make_embed(color=config["colors"]["error"], description=msg, title=title)
|
||||||
|
def info_embed(title, msg, fields=()): return make_embed(color=config["colors"]["info"], description=msg, title=title, fields=fields)
|
||||||
|
|
||||||
# https://github.com/LyricLy/Esobot/blob/bcc9e548c84ea9b23fc832d0b0aaa8288de64886/cogs/general.py
|
# https://github.com/LyricLy/Esobot/blob/bcc9e548c84ea9b23fc832d0b0aaa8288de64886/cogs/general.py
|
||||||
lyrictable_raw = {
|
lyrictable_raw = {
|
||||||
"a": "а",
|
"a": "а",
|
||||||
@ -134,3 +171,8 @@ def apioform():
|
|||||||
|
|
||||||
def unlyric(text):
|
def unlyric(text):
|
||||||
return text.translate(lyrictable).replace("\u200b", "")
|
return text.translate(lyrictable).replace("\u200b", "")
|
||||||
|
|
||||||
|
def gen_codeblock(content):
|
||||||
|
return "```\n" + content.replace("```", "\\`\\`\\`")[:1900] + "\n```"
|
||||||
|
|
||||||
|
def json_encode(x): return json.dumps(x, separators=(',', ':'))
|
8192
wordlist-8192.txt
Normal file
8192
wordlist-8192.txt
Normal file
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user