mirror of
https://github.com/osmarks/autobotrobot
synced 2024-06-16 10:09:59 +00:00
Remove problems
This commit is contained in:
parent
6d18a5c56e
commit
ffeea2281c
|
@ -1,72 +0,0 @@
|
|||
from discord.ext import commands
|
||||
import discord
|
||||
import logging
|
||||
import asyncio
|
||||
import discord
|
||||
from datetime import datetime
|
||||
import re
|
||||
import collections
|
||||
|
||||
import util
|
||||
|
||||
Achievement = collections.namedtuple("Achievement", ["name", "condition", "description"])
|
||||
|
||||
achievements = {
|
||||
"spectre_of_communism": Achievement("Containment Efforts Ongoing", "Refer to the 'spectre of communism' in a message.", "A spectre is haunting Europe. The spectre of communism. Containment efforts are ongoing and full containment is projected by 2036."),
|
||||
"test": Achievement("Test", "Test achievement. Obtained for testing.", "Great job, you ran the test command!"),
|
||||
"spam": Achievement("beesbeesbeesbeesbeesbeesbeesbeesbees", "Send a long message containing the same thing repeatedly.", "You should probably not do this, nobody* likes spam!"),
|
||||
"unicode_abuse": Achievement("Anomalous Unicode", "Send a high proportion of weird Unicode characters in a message.", "h̵͖̻̮̗̹̆͛͆̎ͮͤͫ͛ͦ̓̅ͤ́͢é͒ͧ̌̀ͪ̈͂̈́̉ͣ̅̿̄̌̋̿̽̚͏̛͏͚̯͉̟͇̼̹͎ͅa̠̹̘͎̫̜̞̩͖̟̟͍͇͈͍̝͕͛ͥ͊̾̈́ͩͯͩͭ̆̋͐͗̉͋̓̀͝v͎͖̜͎͔̞͚͉̺̞̘̥͖̝͚̺̍ͤ̌͂ͨ̃̅ͫ̿͛ͯ̓̉̆̎͊̀̚̕͟s̪̠̟̣̝̹̭̻̈́ͤ͗̏ͮ̂ͯ̈́̊ͩ̓̆̌̆͌̽̓̈́̚͢͞e̛̞̙̜̗̰͕͕͎̺͍̭̲̟̭̲̫̬͓ͯ̅̓̆̂̔̃͟r̷̛̮̮͇̳̳̾ͯͮͩ̏͂ͤ̿̽ͧ͒͋́̕ͅͅv̴̠͉̼̮̭̘ͪͯͦ͌́ͯ̒̃̀́̃͜͝ͅe̵̷̢͕̣̻̥̲͓̼͍̱͕̮̯̱̤̹̱̝̎̓̈́̿ͤ̔̍ͭͭ͐ͅŗ̔ͮͯ͂́͏̻͈̱ͅ ̣͇̼͊̄ͫ̆̍̄̀̀̓͊͐͋̌͘͠į̱͔̰̭̫̱̫̊ͪ̅ͥ̈́ͥ̐͌̅ͪ̅ͨ̎̀͘͝s̍͑̌̋̅͌͂ͨͬͯ̇͊҉̛̱̺͕̰͓̗̖̬͡͡ ̥̤̺̖̪̪́ͯͣ̏̅̈ͣ̿̀͠͠͞i̢̛̭̰̻͈̦̣̮̞̤̩̊̌̾͛ͭͦ̆ͮ̃̎ͪ̔ͬ͊̆͂ͫͅn̸̖͚̣̪̩̏ͥ̈́̅ͯ̔͆́ͦ͗͛͒̃̃ͫ͟͜͝͠ȩ̸͎̟̣̞͉̫̗̙̻̯͍̰̣̌ͪͨ͛̆̕͡v̙͙̲͕͔̦̣̺͔̖͉̜̲̩̈̿ͥ̎͊̈́̊ͯͯ͒ͭ̊̀͢i̪͈̣̱̞̥̰̟̣̩̼̻̪̳̤͇̻̹͉͗ͭ͆̆̎̀͑͑̆͋̏̏͊ͣͦ͆ͣ̈́̓͟͢ţ̵̘̫̯͓̻̗͕̘͙̯̞̪̪̲̤̬̜͕ͫ̄̌̓̎͌ͧ̔͟͢ͅa̸̧̭̲̯̳̔́͋̐͂̇ͪ̔̐́̚͢b͐̅̔ͭ͗̊̂̾̀̓ͭͭ͑ͤ̏̐̃ͩͬ҉̞̼̮̤̝̲̳͓̗̤̫̭̝̹̙͘͟͝ļ̷͈̭̖͓̜̬͔̻͔̀̎ͯ͗̐̽̏ͦ̊͗ͧ́͘ͅe̢͍̦̗̬̝̠͔̳̣̯̮̣̹͍͙̞̜ͣ̉͆̊̀̎ͦ͌̂̋̊ͨ͛́"),
|
||||
"rtfm": Achievement("RTFM", "Tell someone to read the documentation.", "Apparently, people won't do this without prompting half the time."),
|
||||
"error": Achievement("You broke it", "Cause an internal error in the bot", "I should probably fix this.")
|
||||
}
|
||||
|
||||
async def achieve(bot: commands.Bot, message: discord.Message, achievement):
|
||||
guild_conf = await bot.database.execute_fetchone("SELECT achievement_messages FROM guild_config WHERE id = ?", (message.guild.id,))
|
||||
if guild_conf and guild_conf["achievement_messages"] == 0: return
|
||||
channel = message.channel
|
||||
|
||||
uid = message.author.id
|
||||
# ensure the user doesn't have achievements off
|
||||
conf = await bot.database.execute_fetchone("SELECT * FROM user_config WHERE id = ?", (uid,))
|
||||
if conf and conf["achievement_tracking_enabled"] == 0: return
|
||||
if not conf:
|
||||
await bot.database.execute("INSERT INTO user_config VALUES (?, NULL)", (uid,))
|
||||
await bot.database.commit()
|
||||
# detect if achievement already earned
|
||||
if await bot.database.execute_fetchone("SELECT 1 FROM achievements WHERE user_id = ? AND achievement = ?", (uid, achievement)):
|
||||
return
|
||||
achievement_info = achievements[achievement]
|
||||
description = f"Congratulations, {message.author.name}#{message.author.discriminator}! You achieved the achievement __{achievement_info.name}__.\n\n{achievement_info.description}\n*{achievement_info.condition}*"
|
||||
e = util.make_embed(description=description, title="Achievement achieved!", color=util.hashbow(achievement))
|
||||
e.set_thumbnail(url=await util.get_asset(bot, f"achievements/{achievement}.png"))
|
||||
await channel.send(embed=e)
|
||||
await bot.database.execute("INSERT INTO achievements VALUES (?, ?, ?)", (uid, achievement, util.timestamp()))
|
||||
await bot.database.commit()
|
||||
logging.info("awarded achievement %s to %s", message.author.name, achievement)
|
||||
|
||||
def setup(bot):
|
||||
@bot.group(name="achievements", aliases=["ach", "achieve", "achievement"], brief="Achieve a wide variety of fun achievements!", help=f"""
|
||||
Do things and get arbitrary achievements for them!
|
||||
Note that due to reasons messages for achievements will not be shown except in opted-in servers, although achievements will be gained regardless.
|
||||
""")
|
||||
async def achievements(ctx): pass
|
||||
|
||||
@achievements.command(help="Enable/disable achievement messages on this guild.")
|
||||
@commands.check(util.server_mod_check)
|
||||
async def set_enabled(ctx, on: bool):
|
||||
await bot.database.execute("INSERT OR REPLACE INTO guild_config VALUES (?, ?)", (ctx.guild.id, int(on)))
|
||||
await bot.database.commit()
|
||||
await ctx.send(f"Achievement messages set to: {on}")
|
||||
|
||||
@achievements.command(help="Obtain a test achievement")
|
||||
async def test(ctx):
|
||||
await achieve(ctx.bot, ctx.message, "test")
|
||||
|
||||
@bot.listen("on_message")
|
||||
async def message_listener(msg: discord.Message):
|
||||
content = msg.content
|
||||
content_len = len(msg.content)
|
||||
if re.match("spect(re|er).{,20}(communism|☭)", content, re.IGNORECASE): await achieve(bot, msg, "spectre_of_communism")
|
||||
if re.match(r"^(.+)\1+$", content) and len(content) >= 1950: await achieve(bot, msg, "spam")
|
||||
if content_len > 30 and (len(re.findall("[\u0300-\u036f\U00040000-\U0010FFFF]", content)) / content_len) > 0.35: await achieve(bot, msg, "unicode_abuse")
|
||||
if re.match("(RTFM|(read|look|view|use).{,8}(document|man(page|ual)s?|instruction))", content, re.IGNORECASE): await achieve(bot, msg, "rtfm")
|
99
src/db.py
99
src/db.py
|
@ -1,99 +0,0 @@
|
|||
import aiosqlite
|
||||
import logging
|
||||
|
||||
migrations = [
|
||||
"""
|
||||
CREATE TABLE deleted_items (
|
||||
id INTEGER PRIMARY KEY,
|
||||
timestamp INTEGER NOT NULL,
|
||||
item TEXT NOT NULL
|
||||
);
|
||||
""",
|
||||
"""
|
||||
CREATE INDEX deleted_items_timestamps ON deleted_items(timestamp);
|
||||
""",
|
||||
"""
|
||||
CREATE TABLE reminders (
|
||||
id INTEGER PRIMARY KEY,
|
||||
remind_timestamp INTEGER NOT NULL,
|
||||
created_timestamp INTEGER NOT NULL,
|
||||
reminder TEXT NOT NULL,
|
||||
expired INTEGER NOT NULL,
|
||||
extra TEXT NOT NULL
|
||||
);
|
||||
""",
|
||||
"""
|
||||
CREATE TABLE telephone_config (
|
||||
id TEXT PRIMARY KEY,
|
||||
guild_id INTEGER NOT NULL,
|
||||
channel_id INTEGER NOT NULL UNIQUE,
|
||||
webhook TEXT
|
||||
);
|
||||
""",
|
||||
"""
|
||||
CREATE TABLE calls (
|
||||
from_id TEXT NOT NULL REFERENCES telephone_config(id) UNIQUE,
|
||||
to_id TEXT NOT NULL REFERENCES telephone_config(id),
|
||||
start_time INTEGER NOT NULL
|
||||
);
|
||||
""",
|
||||
"""
|
||||
CREATE TABLE guild_config (
|
||||
id INTEGER PRIMARY KEY,
|
||||
achievement_messages INTEGER
|
||||
);
|
||||
|
||||
CREATE TABLE user_config (
|
||||
id INTEGER PRIMARY KEY,
|
||||
achievement_tracking_enabled INTEGER
|
||||
);
|
||||
|
||||
CREATE TABLE stats (
|
||||
user_id INTEGER NOT NULL REFERENCES user_config (id),
|
||||
stat TEXT NOT NULL COLLATE NOCASE,
|
||||
value BLOB NOT NULL,
|
||||
UNIQUE (user_id, stat)
|
||||
);
|
||||
|
||||
CREATE TABLE achievements (
|
||||
user_id INTEGER NOT NULL REFERENCES user_config (id),
|
||||
achievement TEXT NOT NULL,
|
||||
achieved_time INTEGER NOT NULL,
|
||||
UNIQUE (user_id, achievement)
|
||||
);
|
||||
""",
|
||||
"""
|
||||
CREATE TABLE assets (
|
||||
identifier TEXT PRIMARY KEY,
|
||||
url TEXT NOT NULL
|
||||
);
|
||||
"""
|
||||
]
|
||||
|
||||
async def execute_fetchone(self, sql, params=None):
|
||||
if params == None: params = ()
|
||||
return await self._execute(self._fetchone, sql, params)
|
||||
|
||||
def _fetchone(self, sql, params):
|
||||
cursor = self._conn.execute(sql, params)
|
||||
return cursor.fetchone()
|
||||
|
||||
async def init(db_path):
|
||||
db = await aiosqlite.connect(db_path)
|
||||
await db.execute("PRAGMA foreign_keys = ON")
|
||||
|
||||
db.row_factory = aiosqlite.Row
|
||||
aiosqlite.Connection._fetchone = _fetchone
|
||||
aiosqlite.Connection.execute_fetchone = execute_fetchone
|
||||
|
||||
version = (await (await db.execute("PRAGMA user_version")).fetchone())[0]
|
||||
for i in range(version, len(migrations)):
|
||||
await db.executescript(migrations[i])
|
||||
# Normally interpolating like this would be a terrible idea because of SQL injection.
|
||||
# However, in this case there is not an obvious alternative (the parameter-based way apparently doesn't work)
|
||||
# and i + 1 will always be an integer anyway
|
||||
await db.execute(f"PRAGMA user_version = {i + 1}")
|
||||
await db.commit()
|
||||
logging.info(f"Migrated DB to schema {i + 1}")
|
||||
|
||||
return db
|
73
src/debug.py
73
src/debug.py
|
@ -1,73 +0,0 @@
|
|||
import util
|
||||
import asyncio
|
||||
import traceback
|
||||
import re
|
||||
from discord.ext import commands
|
||||
import util
|
||||
|
||||
def setup(bot):
|
||||
@bot.group()
|
||||
@commands.check(util.admin_check)
|
||||
async def magic(ctx):
|
||||
if ctx.invoked_subcommand == None:
|
||||
return await ctx.send("Invalid magic command.")
|
||||
|
||||
@magic.command(rest_is_raw=True)
|
||||
async def py(ctx, *, code):
|
||||
timeout = 5.0
|
||||
timeout_match = re.search("#timeout:([0-9]+)", code, re.IGNORECASE)
|
||||
if timeout_match:
|
||||
timeout = int(timeout_match.group(1))
|
||||
if timeout == 0: timeout = None
|
||||
code = util.extract_codeblock(code)
|
||||
try:
|
||||
loc = {
|
||||
**locals(),
|
||||
"bot": bot,
|
||||
"ctx": ctx,
|
||||
"db": bot.database
|
||||
}
|
||||
|
||||
def check(re, u): return str(re.emoji) == "❌" and u == ctx.author
|
||||
|
||||
result = None
|
||||
async def run():
|
||||
nonlocal result
|
||||
result = await util.async_exec(code, loc, globals())
|
||||
halt_task = asyncio.create_task(bot.wait_for("reaction_add", check=check))
|
||||
exec_task = asyncio.create_task(run())
|
||||
done, pending = await asyncio.wait((exec_task, halt_task), timeout=timeout, return_when=asyncio.FIRST_COMPLETED)
|
||||
for task in done: task.result() # get exceptions
|
||||
for task in pending: task.cancel()
|
||||
if result != None:
|
||||
if isinstance(result, str):
|
||||
await ctx.send(result[:2000])
|
||||
else:
|
||||
await ctx.send(util.gen_codeblock(repr(result)))
|
||||
except (TimeoutError, asyncio.CancelledError):
|
||||
await ctx.send(embed=util.error_embed("Timed out."))
|
||||
except BaseException as e:
|
||||
await ctx.send(embed=util.error_embed(util.gen_codeblock(traceback.format_exc())))
|
||||
|
||||
@magic.command(rest_is_raw=True)
|
||||
async def sql(ctx, *, code):
|
||||
code = util.extract_codeblock(code)
|
||||
try:
|
||||
csr = bot.database.execute(code)
|
||||
out = ""
|
||||
async with csr as cursor:
|
||||
async for row in cursor:
|
||||
out += " ".join(map(repr, row)) + "\n"
|
||||
await ctx.send(util.gen_codeblock(out))
|
||||
await bot.database.commit()
|
||||
except Exception as e:
|
||||
await ctx.send(embed=util.error_embed(util.gen_codeblock(traceback.format_exc())))
|
||||
|
||||
@magic.command()
|
||||
async def reload_config(ctx):
|
||||
util.load_config()
|
||||
ctx.send("Done!")
|
||||
|
||||
@magic.command()
|
||||
async def reload_ext(ctx):
|
||||
for ext in util.extensions: bot.reload_extension(ext)
|
|
@ -1,13 +0,0 @@
|
|||
import util
|
||||
import random
|
||||
import logging
|
||||
import discord
|
||||
|
||||
def setup(bot):
|
||||
@bot.listen()
|
||||
async def on_member_join(member):
|
||||
if member.guild.id == util.config["heavserver"]["id"]:
|
||||
logging.info("%s (%d) joined heavserver", member.display_name, member.id)
|
||||
if member.bot:
|
||||
await member.add_roles(discord.utils.get(member.guild.roles, id=util.config["heavserver"]["quarantine_role"]))
|
||||
await member.add_roles(discord.utils.get(member.guild.roles, id=random.choice(util.config["heavserver"]["moderator_roles"][:])))
|
237
src/main.py
237
src/main.py
|
@ -1,237 +0,0 @@
|
|||
import discord
|
||||
import toml
|
||||
import logging
|
||||
import subprocess
|
||||
import discord.ext.commands as commands
|
||||
import discord.ext.tasks as tasks
|
||||
import re
|
||||
import asyncio
|
||||
import json
|
||||
import argparse
|
||||
import traceback
|
||||
import random
|
||||
import rolldice
|
||||
import collections
|
||||
import typing
|
||||
from numpy.random import default_rng
|
||||
|
||||
import tio
|
||||
import db
|
||||
import util
|
||||
import achievement
|
||||
|
||||
config = util.config
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(asctime)s %(message)s", datefmt="%H:%M:%S %d/%m/%Y")
|
||||
|
||||
#intents = discord.Intents.default()
|
||||
#intents.members = True
|
||||
|
||||
bot = commands.Bot(command_prefix=config["prefix"], description="AutoBotRobot, the most useless bot in the known universe." + util.config.get("description_suffix", ""),
|
||||
case_insensitive=True, allowed_mentions=discord.AllowedMentions(everyone=False, users=True, roles=True))
|
||||
bot._skip_check = lambda x, y: False
|
||||
|
||||
cleaner = discord.ext.commands.clean_content()
|
||||
def clean(ctx, text):
|
||||
return cleaner.convert(ctx, text)
|
||||
|
||||
@bot.event
|
||||
async def on_message(message):
|
||||
words = message.content.split(" ")
|
||||
if len(words) == 10 and message.author.id == 435756251205468160:
|
||||
await message.channel.send(util.unlyric(message.content))
|
||||
else:
|
||||
if message.author == bot.user or message.author.discriminator == "0000": return
|
||||
ctx = await bot.get_context(message)
|
||||
if not ctx.valid: return
|
||||
await bot.invoke(ctx)
|
||||
|
||||
@bot.event
|
||||
async def on_command_error(ctx, err):
|
||||
#print(ctx, err)
|
||||
if isinstance(err, (commands.CommandNotFound, commands.CheckFailure)): return
|
||||
if isinstance(err, commands.MissingRequiredArgument): return await ctx.send(embed=util.error_embed(str(err)))
|
||||
try:
|
||||
trace = re.sub("\n\n+", "\n", "\n".join(traceback.format_exception(err, err, err.__traceback__)))
|
||||
logging.error("command error occured (in %s)", ctx.invoked_with, exc_info=err)
|
||||
await ctx.send(embed=util.error_embed(util.gen_codeblock(trace), title="Internal error"))
|
||||
await achievement.achieve(ctx.bot, ctx.message, "error")
|
||||
except Exception as e: print("meta-error:", e)
|
||||
|
||||
@bot.command(help="Gives you a random fortune as generated by `fortune`.")
|
||||
async def fortune(ctx):
|
||||
await ctx.send(subprocess.run(["fortune"], stdout=subprocess.PIPE, encoding="UTF-8").stdout)
|
||||
|
||||
@bot.command(help="Generates an apioform type.")
|
||||
async def apioform(ctx):
|
||||
await ctx.send(util.apioform())
|
||||
|
||||
@bot.command(help="Says Pong.")
|
||||
async def ping(ctx):
|
||||
await ctx.send("Pong.")
|
||||
|
||||
@bot.command(help="Deletes the specified target.", rest_is_raw=True)
|
||||
async def delete(ctx, *, raw_target):
|
||||
target = await clean(ctx, raw_target.strip().replace("\n", " "))
|
||||
if len(target) > 256:
|
||||
await ctx.send(embed=util.error_embed("Deletion target must be max 256 chars"))
|
||||
return
|
||||
async with ctx.typing():
|
||||
await ctx.send(f"Deleting {target}...")
|
||||
await asyncio.sleep(1)
|
||||
await bot.database.execute("INSERT INTO deleted_items (timestamp, item) VALUES (?, ?)", (util.timestamp(), target))
|
||||
await bot.database.commit()
|
||||
await ctx.send(f"Deleted {target} successfully.")
|
||||
|
||||
@bot.command(help="View recently deleted things, optionally matching a filter.")
|
||||
async def list_deleted(ctx, search=None):
|
||||
acc = "Recently deleted:\n"
|
||||
if search: acc = f"Recently deleted (matching {search}):\n"
|
||||
csr = None
|
||||
if search:
|
||||
csr = bot.database.execute("SELECT * FROM deleted_items WHERE item LIKE ? ORDER BY timestamp DESC LIMIT 100", (f"%{search}%",))
|
||||
else:
|
||||
csr = bot.database.execute("SELECT * FROM deleted_items ORDER BY timestamp DESC LIMIT 100")
|
||||
async with csr as cursor:
|
||||
async for row in cursor:
|
||||
to_add = "- " + row[2].replace("```", "[REDACTED]") + "\n"
|
||||
if len(acc + to_add) > 2000:
|
||||
break
|
||||
acc += to_add
|
||||
await ctx.send(acc)
|
||||
|
||||
# Python, for some *very intelligent reason*, makes the default ArgumentParser exit the program on error.
|
||||
# This is obviously undesirable behavior in a Discord bot, so we override this.
|
||||
class NonExitingArgumentParser(argparse.ArgumentParser):
|
||||
def exit(self, status=0, message=None):
|
||||
if status:
|
||||
raise Exception(f'Flag parse error: {message}')
|
||||
exit(status)
|
||||
|
||||
EXEC_REGEX = "^(.*)```([a-zA-Z0-9_\\-+]+)?\n(.*)```$"
|
||||
|
||||
exec_flag_parser = NonExitingArgumentParser(add_help=False)
|
||||
exec_flag_parser.add_argument("--verbose", "-v", action="store_true")
|
||||
exec_flag_parser.add_argument("--language", "-L")
|
||||
|
||||
@bot.command(rest_is_raw=True, help="Execute provided code (in a codeblock) using TIO.run.")
|
||||
async def exec(ctx, *, arg):
|
||||
match = re.match(EXEC_REGEX, arg, flags=re.DOTALL)
|
||||
if match == None:
|
||||
await ctx.send(embed=util.error_embed("Invalid format. Expected a codeblock."))
|
||||
return
|
||||
flags_raw = match.group(1)
|
||||
flags = exec_flag_parser.parse_args(flags_raw.split())
|
||||
lang = flags.language or match.group(2)
|
||||
if not lang:
|
||||
await ctx.send(embed=util.error_embed("No language specified. Use the -L flag or add a language to your codeblock."))
|
||||
return
|
||||
lang = lang.strip()
|
||||
code = match.group(3)
|
||||
|
||||
async with ctx.typing():
|
||||
ok, real_lang, result, debug = await tio.run(lang, code)
|
||||
if not ok:
|
||||
await ctx.send(embed=util.error_embed(util.gen_codeblock(result), "Execution failed"))
|
||||
else:
|
||||
out = result
|
||||
if flags.verbose:
|
||||
debug_block = "\n" + util.gen_codeblock(f"""{debug}\nLanguage: {real_lang}""")
|
||||
out = out[:2000 - len(debug_block)] + debug_block
|
||||
else:
|
||||
out = out[:2000]
|
||||
await ctx.send(out)
|
||||
|
||||
@bot.command(help="List supported languages, optionally matching a filter.")
|
||||
async def supported_langs(ctx, search=None):
|
||||
langs = sorted(tio.languages())
|
||||
acc = ""
|
||||
for lang in langs:
|
||||
if len(acc + lang) > 2000:
|
||||
await ctx.send(acc)
|
||||
acc = ""
|
||||
if search == None or search in lang: acc += lang + " "
|
||||
if acc == "": acc = "No results."
|
||||
await ctx.send(acc)
|
||||
|
||||
@bot.command(help="Get some information about the bot.", aliases=["invite"])
|
||||
async def about(ctx):
|
||||
await ctx.send("""**AutoBotRobot: The least useful Discord bot ever designed.**
|
||||
AutoBotRobot has many features, but not necessarily any practical ones.
|
||||
It can execute code via TIO.run, do reminders, print fortunes, and not any more!
|
||||
AutoBotRobot is open source - the code is available at <https://github.com/osmarks/autobotrobot> - and you could run your own instance if you wanted to and could get around the complete lack of user guide or documentation.
|
||||
You can also invite it to your server: <https://discordapp.com/oauth2/authorize?&client_id=509849474647064576&scope=bot&permissions=68608>
|
||||
""")
|
||||
|
||||
@bot.command(help="Randomly generate an integer using dice syntax.", name="random", rest_is_raw=True)
|
||||
async def random_int(ctx, *, dice):
|
||||
await ctx.send("Disabled until CPU use restrictions can be done.")
|
||||
#await ctx.send(rolldice.roll_dice(dice)[0])
|
||||
|
||||
bad_things = ["lyric", "solarflame", "lyric", "319753218592866315", "andrew", "6", "c++", "☭", "communism"]
|
||||
good_things = ["potato", "heav", "gollark", "helloboi", "bee", "hellboy", "rust", "ferris", "crab", "transistor", "endos", "make esolang"]
|
||||
negations = ["not", "bad", "un", "kill", "n't", "¬", "counter"]
|
||||
def weight(thing):
|
||||
lthing = thing.lower()
|
||||
weight = 1.0
|
||||
if lthing == "c": weight *= 0.3
|
||||
for bad_thing in bad_things:
|
||||
if bad_thing in lthing: weight *= 0.5
|
||||
for good_thing in good_things:
|
||||
if good_thing in lthing: weight *= 2.0
|
||||
for negation in negations:
|
||||
for _ in range(lthing.count(negation)): weight = 1 / weight
|
||||
return weight
|
||||
|
||||
rng = default_rng()
|
||||
|
||||
@bot.command(help="'Randomly' choose between the specified options.", name="choice", aliases=["choose"])
|
||||
async def random_choice(ctx, *choices):
|
||||
choices = list(choices)
|
||||
samples = 1
|
||||
# apparently doing typing.Optional[int] doesn't work properly with this, so just bodge around it
|
||||
try:
|
||||
samples = int(choices[0])
|
||||
choices.pop(0)
|
||||
except: pass
|
||||
|
||||
if samples > 9223372036854775807 or samples < 1 or len(choices) < 1:
|
||||
await ctx.send("No.")
|
||||
return
|
||||
|
||||
# because of python weirdness, using sum() on the bare map iterator consumes it, which means we have to actually make a list
|
||||
weights = list(map(weight, choices))
|
||||
|
||||
if samples == 1: return await ctx.send(random.choices(choices, weights=weights, k=1)[0])
|
||||
|
||||
total = sum(weights)
|
||||
probabilities = list(map(lambda x: x / total, weights))
|
||||
results = map(lambda t: (choices[t[0]], t[1]), enumerate(rng.multinomial(samples, list(probabilities))))
|
||||
|
||||
await ctx.send("\n".join(map(lambda x: f"{x[0]} x{x[1]}", results)))
|
||||
|
||||
@bot.check
|
||||
async def andrew_bad(ctx):
|
||||
return ctx.message.author.id != 543131534685765673
|
||||
|
||||
@bot.event
|
||||
async def on_ready():
|
||||
logging.info("Connected as " + bot.user.name)
|
||||
await bot.change_presence(status=discord.Status.online,
|
||||
activity=discord.Activity(name=f"{bot.command_prefix}help", type=discord.ActivityType.listening))
|
||||
|
||||
async def run_bot():
|
||||
bot.database = await db.init(config["database"])
|
||||
for ext in util.extensions:
|
||||
logging.info("loaded %s", ext)
|
||||
bot.load_extension(ext)
|
||||
await bot.start(config["token"])
|
||||
|
||||
if __name__ == '__main__':
|
||||
loop = asyncio.get_event_loop()
|
||||
try:
|
||||
loop.run_until_complete(run_bot())
|
||||
except KeyboardInterrupt:
|
||||
loop.run_until_complete(bot.logout())
|
||||
finally:
|
||||
loop.close()
|
109
src/reminders.py
109
src/reminders.py
|
@ -1,109 +0,0 @@
|
|||
import json
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
import discord.ext.tasks as tasks
|
||||
|
||||
import util
|
||||
|
||||
def setup(bot):
|
||||
@bot.command(brief="Set a reminder to be reminded about later.", rest_is_raw=True, help="""Sets a reminder which you will (probably) be reminded about at/after the specified time.
|
||||
All times are UTC.
|
||||
Reminders are checked every minute, so while precise times are not guaranteed, reminders should under normal conditions be received within 2 minutes of what you specify.
|
||||
Note that due to technical limitations reminders beyond the year 10000 CE or in the past cannot currently be handled.
|
||||
Note that reminder delivery is not guaranteed, due to possible issues including but not limited to: data loss, me eventually not caring, the failure of Discord (in this case message delivery will still be attempted manually on a case-by-case basis), the collapse of human civilization, or other existential risks.""")
|
||||
async def remind(ctx, time, *, reminder):
|
||||
reminder = reminder.strip()
|
||||
if len(reminder) > 512:
|
||||
await ctx.send(embed=util.error_embed("Maximum reminder length is 512 characters", "Foolish user error"))
|
||||
return
|
||||
extra_data = {
|
||||
"author_id": ctx.author.id,
|
||||
"channel_id": ctx.message.channel.id,
|
||||
"message_id": ctx.message.id,
|
||||
"guild_id": ctx.message.guild and ctx.message.guild.id,
|
||||
"original_time_spec": time
|
||||
}
|
||||
try:
|
||||
now = datetime.now(tz=timezone.utc)
|
||||
time = util.parse_time(time)
|
||||
except:
|
||||
await ctx.send(embed=util.error_embed("Invalid time (wrong format/too large/non-integer months or years)"))
|
||||
return
|
||||
await bot.database.execute("INSERT INTO reminders (remind_timestamp, created_timestamp, reminder, expired, extra) VALUES (?, ?, ?, ?, ?)",
|
||||
(time.timestamp(), now.timestamp(), reminder, 0, util.json_encode(extra_data)))
|
||||
await bot.database.commit()
|
||||
await ctx.send(f"Reminder scheduled for {util.format_time(time)} ({util.format_timedelta(now, time)}).")
|
||||
|
||||
async def send_to_channel(info, text):
|
||||
channel = bot.get_channel(info["channel_id"])
|
||||
if not channel: raise Exception(f"channel {info['channel_id']} unavailable/nonexistent")
|
||||
await channel.send(text)
|
||||
|
||||
async def send_by_dm(info, text):
|
||||
user = bot.get_user(info["author_id"])
|
||||
if not user: raise Exception(f"user {info['author_id']} unavailable/nonexistent")
|
||||
if not user.dm_channel: await user.create_dm()
|
||||
await user.dm_channel.send(text)
|
||||
|
||||
async def send_to_guild(info, text):
|
||||
guild = bot.get_guild(info["guild_id"])
|
||||
member = guild.get_member(info["author_id"])
|
||||
self = guild.get_member(bot.user.id)
|
||||
# if member is here, find a channel they can read and the bot can send in
|
||||
if member:
|
||||
for chan in guild.text_channels:
|
||||
if chan.permissions_for(member).read_messages and chan.permissions_for(self).send_messages:
|
||||
await chan.send(text)
|
||||
return
|
||||
# if member not here or no channel they can read messages in, send to any available channel
|
||||
for chan in guild.text_channels:
|
||||
if chan.permissions_for(self).send_messages:
|
||||
await chan.send(text)
|
||||
return
|
||||
raise Exception(f"guild {info['author_id']} has no (valid) channels")
|
||||
|
||||
remind_send_methods = [
|
||||
("original channel", send_to_channel),
|
||||
("direct message", send_by_dm),
|
||||
("originating guild", send_to_guild)
|
||||
]
|
||||
|
||||
@tasks.loop(seconds=60)
|
||||
async def remind_worker():
|
||||
csr = bot.database.execute("SELECT * FROM reminders WHERE expired = 0 AND remind_timestamp < ?", (util.timestamp(),))
|
||||
to_expire = []
|
||||
async with csr as cursor:
|
||||
async for row in cursor:
|
||||
rid, remind_timestamp, created_timestamp, reminder_text, _, extra = row
|
||||
try:
|
||||
remind_timestamp = datetime.utcfromtimestamp(remind_timestamp)
|
||||
created_timestamp = datetime.utcfromtimestamp(created_timestamp)
|
||||
extra = json.loads(extra)
|
||||
uid = extra["author_id"]
|
||||
text = f"<@{uid}> Reminder queued at {util.format_time(created_timestamp)}: {reminder_text}"
|
||||
|
||||
for method_name, func in remind_send_methods:
|
||||
print("trying", method_name, rid)
|
||||
try:
|
||||
await func(extra, text)
|
||||
to_expire.append(rid)
|
||||
break
|
||||
except Exception as e: logging.warning("failed to send %d to %s", rid, method_name, exc_info=e)
|
||||
except Exception as e:
|
||||
logging.warning("Could not send reminder %d", rid, exc_info=e)
|
||||
for expiry_id in to_expire:
|
||||
logging.info("Expiring reminder %d", expiry_id)
|
||||
await bot.database.execute("UPDATE reminders SET expired = 1 WHERE id = ?", (expiry_id,))
|
||||
await bot.database.commit()
|
||||
|
||||
@remind_worker.before_loop
|
||||
async def before_remind_worker():
|
||||
logging.info("Waiting for bot readiness...")
|
||||
await bot.wait_until_ready()
|
||||
logging.info("Remind worker starting")
|
||||
|
||||
remind_worker.start()
|
||||
bot.remind_worker = remind_worker
|
||||
|
||||
def teardown(bot):
|
||||
bot.remind_worker.cancel()
|
181
src/telephone.py
181
src/telephone.py
|
@ -1,181 +0,0 @@
|
|||
from discord.ext import commands
|
||||
import discord
|
||||
import logging
|
||||
import asyncio
|
||||
import hashlib
|
||||
from datetime import datetime
|
||||
|
||||
import util
|
||||
|
||||
# Generate a "phone" address
|
||||
# Not actually for phones
|
||||
def generate_address(ctx):
|
||||
h = hashlib.blake2b(str(ctx.guild.id).encode("utf-8")).digest()
|
||||
words = open("wordlist-8192.txt").readlines()
|
||||
out = ""
|
||||
for i in range(3):
|
||||
out += words[int.from_bytes(h[i * 2:i * 2 + 3], "little") % 8192].strip().title()
|
||||
return out
|
||||
|
||||
def setup(bot):
|
||||
@bot.group(name="apiotelephone", aliases=["tel", "tele", "telephone", "apiotel"], brief="ApioTelephone lets you 'call' other servers.", help=f"""
|
||||
Call other (participating) servers with ApioTelephone! To configure a channel for telephony, do `{bot.command_prefix}tel setup` (requires Manage Channels).
|
||||
It's recommended that you give the bot Manage Webhooks permissions in this channel so that it can use webhook calls mode.
|
||||
To place a call, do `{bot.command_prefix}tel dial [number]` - the other end has to accept the call.
|
||||
When you want to end a call, do {bot.command_prefix}tel disconnect.
|
||||
""")
|
||||
async def telephone(ctx): pass
|
||||
|
||||
async def get_channel_config(channel):
|
||||
return await bot.database.execute_fetchone("SELECT * FROM telephone_config WHERE channel_id = ?", (channel,))
|
||||
|
||||
async def get_addr_config(addr):
|
||||
return await bot.database.execute_fetchone("SELECT * FROM telephone_config WHERE id = ?", (addr,))
|
||||
|
||||
@bot.listen("on_message")
|
||||
async def forward_call_messages(message):
|
||||
channel = message.channel.id
|
||||
if (message.author.discriminator == "0000" and message.author.bot) or message.author == bot.user or message.content == "": # check if webhook, from itself, or only has embeds
|
||||
return
|
||||
calls = await bot.database.execute_fetchall("""SELECT tcf.channel_id AS from_channel, tct.channel_id AS to_channel,
|
||||
tcf.webhook AS from_webhook, tct.webhook AS to_webhook FROM calls
|
||||
JOIN telephone_config AS tcf ON tcf.id = calls.from_id JOIN telephone_config AS tct ON tct.id = calls.to_id
|
||||
WHERE from_channel = ? OR to_channel = ?""", (channel, channel))
|
||||
if calls == []: return
|
||||
async def send_to(call):
|
||||
if call["from_channel"] == channel:
|
||||
other_channel, other_webhook = call["to_channel"], call["to_webhook"]
|
||||
else:
|
||||
other_channel, other_webhook = call["from_channel"], call["from_webhook"]
|
||||
|
||||
async def send_normal_message():
|
||||
m = f"**{message.author.name}**: "
|
||||
m += message.content[:2000 - len(m)]
|
||||
await bot.get_channel(other_channel).send(m)
|
||||
|
||||
if other_webhook:
|
||||
try:
|
||||
await discord.Webhook.from_url(other_webhook, adapter=discord.AsyncWebhookAdapter(bot.http._HTTPClient__session)).send(
|
||||
content=message.content, username=message.author.name, avatar_url=message.author.avatar_url,
|
||||
allowed_mentions=discord.AllowedMentions(everyone=False, roles=False, users=False))
|
||||
except discord.errors.NotFound:
|
||||
logging.warn("channel %d webhook missing", other_channel)
|
||||
await send_normal_message()
|
||||
else: await send_normal_message()
|
||||
|
||||
await asyncio.gather(*map(send_to, calls))
|
||||
|
||||
@telephone.command()
|
||||
@commands.check(util.server_mod_check)
|
||||
async def setup(ctx):
|
||||
num = generate_address(ctx)
|
||||
await ctx.send(f"Your address is {num}.")
|
||||
info = await get_addr_config(num)
|
||||
webhook = None
|
||||
if info: webhook = info["webhook"]
|
||||
if not info or not webhook:
|
||||
try:
|
||||
webhook = (await ctx.channel.create_webhook(name="incoming message display", reason="configure for apiotelephone")).url
|
||||
await ctx.send("Created webhook.")
|
||||
except discord.Forbidden as f:
|
||||
logging.warn("could not create webhook in #%s %s", ctx.channel.name, ctx.guild.name, exc_info=f)
|
||||
await ctx.send("Webhook creation failed - please ensure permissions are available. This is not necessary but is recommended.")
|
||||
await bot.database.execute("INSERT OR REPLACE INTO telephone_config VALUES (?, ?, ?, ?)", (num, ctx.guild.id, ctx.channel.id, webhook))
|
||||
await bot.database.commit()
|
||||
await ctx.send("Configured.")
|
||||
|
||||
@telephone.command(aliases=["call"])
|
||||
async def dial(ctx, address):
|
||||
# basic checks - ensure this is a phone channel and has no other open calls
|
||||
channel_info = await get_channel_config(ctx.channel.id)
|
||||
if not channel_info: return await ctx.send(embed=util.error_embed("Not in a phone channel."))
|
||||
originating_address = channel_info["id"]
|
||||
if address == originating_address: return await ctx.send(embed=util.error_embed("A channel cannot dial itself. That means *you*, Gibson."))
|
||||
recv_info = await get_addr_config(address)
|
||||
if not recv_info: return await ctx.send(embed=util.error_embed("Destination address not found. Please check for typos and/or antimemes."))
|
||||
|
||||
current_call = await bot.database.execute_fetchone("SELECT * FROM calls WHERE from_id = ?", (originating_address,))
|
||||
if current_call: return await ctx.send(embed=util.error_embed(f"A call is already open (to {current_call['to_id']}) from this channel. Currently, only one outgoing call is permitted at a time."))
|
||||
|
||||
# post embed in the receiving channel prompting people to accept/decline call
|
||||
recv_channel = bot.get_channel(recv_info["channel_id"])
|
||||
_, call_message = await asyncio.gather(
|
||||
ctx.send(embed=util.info_embed("Outgoing call", f"Dialing {address}...")),
|
||||
recv_channel.send(embed=util.info_embed("Incoming call",
|
||||
f"Call from {originating_address}. Click :white_check_mark: to accept or :negative_squared_cross_mark: to decline."))
|
||||
)
|
||||
# add clickable reactions to it
|
||||
await asyncio.gather(
|
||||
call_message.add_reaction("✅"),
|
||||
call_message.add_reaction("❎")
|
||||
)
|
||||
|
||||
def check(re, u): return (str(re.emoji) == "✅" or str(re.emoji) == "❎") and u != bot.user
|
||||
|
||||
reaction = None
|
||||
# wait until someone clicks the reactions, or time out and say so
|
||||
try:
|
||||
reaction, user = await bot.wait_for("reaction_add", timeout=util.config["call_timeout"], check=check)
|
||||
except asyncio.TimeoutError:
|
||||
await asyncio.gather(
|
||||
ctx.send(embed=util.error_embed("Timed out", "Outgoing call timed out - the other end did not pick up.")),
|
||||
recv_channel.send(embed=util.error_embed("Timed out", "Call timed out - no response in time"))
|
||||
)
|
||||
|
||||
await asyncio.gather(call_message.remove_reaction("✅", bot.user), call_message.remove_reaction("❎", bot.user))
|
||||
em = str(reaction.emoji) if reaction else "❎"
|
||||
if em == "✅": # accept call
|
||||
await bot.database.execute("INSERT INTO calls VALUES (?, ?, ?)", (originating_address, address, util.timestamp()))
|
||||
await bot.database.commit()
|
||||
await asyncio.gather(
|
||||
ctx.send(embed=util.info_embed("Outgoing call", "Call accepted and connected.")),
|
||||
recv_channel.send(embed=util.info_embed("Incoming call", "Call accepted and connected."))
|
||||
)
|
||||
elif em == "❎": # drop call
|
||||
await ctx.send(embed=util.error_embed("Your call was declined.", "Call declined"))
|
||||
|
||||
async def get_calls(addr):
|
||||
pass
|
||||
|
||||
@telephone.command(aliases=["disconnect", "quit"])
|
||||
async def hangup(ctx):
|
||||
channel_info = await get_channel_config(ctx.channel.id)
|
||||
addr = channel_info["id"]
|
||||
if not channel_info: return await ctx.send(embed=util.error_embed("Not in a phone channel."))
|
||||
from_here = await bot.database.execute_fetchone("SELECT * FROM calls WHERE from_id = ?", (addr,))
|
||||
to_here = await bot.database.execute_fetchone("SELECT * FROM calls WHERE to_id = ?", (addr,))
|
||||
if (not to_here) and (not from_here): return await ctx.send(embed=util.error_embed("No calls are active."))
|
||||
|
||||
other = None
|
||||
if from_here:
|
||||
other = from_here["to_id"]
|
||||
await bot.database.execute("DELETE FROM calls WHERE from_id = ? AND to_id = ?", (addr, other))
|
||||
elif to_here:
|
||||
other = to_here["from_id"]
|
||||
await bot.database.execute("DELETE FROM calls WHERE to_id = ? AND from_id = ?", (addr, other))
|
||||
await bot.database.commit()
|
||||
other_channel = (await get_addr_config(other))["channel_id"]
|
||||
|
||||
await asyncio.gather(
|
||||
ctx.send(embed=util.info_embed("Hung up", f"Call to {other} disconnected.")),
|
||||
bot.get_channel(other_channel).send(embed=util.info_embed("Hung up", f"Call to {addr} disconnected."))
|
||||
)
|
||||
|
||||
@telephone.command(aliases=["status"])
|
||||
async def info(ctx):
|
||||
channel_info = await get_channel_config(ctx.channel.id)
|
||||
if not channel_info: return await ctx.send(embed=util.info_embed("Phone status", "Not a phone channel"))
|
||||
addr = channel_info['id']
|
||||
title = f"{addr} status"
|
||||
|
||||
fields = []
|
||||
|
||||
now = datetime.utcnow()
|
||||
def delta(ts):
|
||||
return util.format_timedelta(datetime.utcfromtimestamp(ts), now)
|
||||
|
||||
incoming = await bot.database.execute_fetchall("SELECT * FROM calls WHERE to_id = ?", (addr,))
|
||||
fields.extend(map(lambda x: ["Incoming call", f"From {x['from_id']} - for {delta(x['start_time'])}"], incoming))
|
||||
outgoing = await bot.database.execute_fetchall("SELECT * FROM calls WHERE from_id = ?", (addr,))
|
||||
fields.extend(map(lambda x: ["Outgoing call", f"To {x['to_id']} - for {delta(x['start_time'])}"], outgoing))
|
||||
await ctx.send(embed=util.info_embed(title, f"Connected: {len(incoming) + len(outgoing)}", fields))
|
27
src/tio.py
27
src/tio.py
|
@ -1,27 +0,0 @@
|
|||
import pytio
|
||||
import http3
|
||||
import gzip
|
||||
import io
|
||||
|
||||
tio = pytio.Tio()
|
||||
|
||||
def languages():
|
||||
return tio.query_languages()
|
||||
|
||||
aliases = {
|
||||
"python": "python3",
|
||||
"javascript": "javascript-node"
|
||||
}
|
||||
|
||||
client = http3.AsyncClient()
|
||||
|
||||
async def run(lang, code):
|
||||
real_lang = aliases.get(lang, lang)
|
||||
req = pytio.TioRequest(real_lang, code)
|
||||
res = await client.post("https://tio.run/cgi-bin/run/api/", data=req.as_deflated_bytes(), timeout=65)
|
||||
content = res.content.decode("UTF-8")
|
||||
split = list(filter(lambda x: x != "\n" and x != "", content.split(content[:16])))
|
||||
if len(split) == 1:
|
||||
return False, real_lang, split[0], None
|
||||
else:
|
||||
return True, real_lang, split[0], split[1]
|
265
src/util.py
265
src/util.py
|
@ -1,265 +0,0 @@
|
|||
import re
|
||||
import datetime
|
||||
import parsedatetime
|
||||
import ast
|
||||
import copy
|
||||
import random
|
||||
from dateutil.relativedelta import relativedelta
|
||||
import json
|
||||
import discord
|
||||
import toml
|
||||
import os.path
|
||||
from discord.ext import commands
|
||||
import hashlib
|
||||
|
||||
config = {}
|
||||
|
||||
# update in place for runtime config reload
|
||||
def load_config():
|
||||
for k, v in toml.load(open("config.toml", "r")).items(): config[k] = v
|
||||
|
||||
load_config()
|
||||
|
||||
def timestamp(): return int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp())
|
||||
|
||||
prefixes = {
|
||||
# big SI prefixes
|
||||
"Y": 24, "Z": 21, "E": 18, "P": 15, "T": 12, "G": 9, "M": 6, "k": 3, "h": 2, "da": 1,
|
||||
# small SI prefixes
|
||||
"d": -1, "c": -2, "m": -3, "µ": -6, "μ": -6, "u": -6, "n": -9, "p": -12, "f": -15, "a": -18, "z": -21, "y": -24,
|
||||
# highly dubiously useful unofficial prefixes
|
||||
"R": 27, "r": -27, "Q": 30, "q": -30, "X": 27, "x": -27, "W": 30, "w": -30
|
||||
}
|
||||
number = "(-?[0-9]+(?:\.[0-9]+)?)(" + "|".join(prefixes.keys()) + ")?"
|
||||
|
||||
time_units = (
|
||||
("galacticyears", "cosmicyears", "gy", "[Cc]y"),
|
||||
("years", "y"),
|
||||
("beelifespans", "🐝", "bees?"),
|
||||
("months", "mo"),
|
||||
("semesters",),
|
||||
("fortnights", "ft?n?"),
|
||||
("weeks", "w"),
|
||||
("days", "d"),
|
||||
("hours", "h"),
|
||||
# Wikipedia tells me this is a traditional Chinese timekeeping unit
|
||||
("ke",),
|
||||
("minutes", "m"),
|
||||
("seconds", "s")
|
||||
)
|
||||
|
||||
tu_mappings = {
|
||||
# dateutil dislikes fractional years, but this is 250My
|
||||
"galacticyears": (7.8892315e15, "seconds"),
|
||||
# apparently the average lifespan of a Western honey bee - I'm not very sure whether this is workers/drones/queens or what so TODO
|
||||
"beelifespans": lambda: (random.randint(122, 152), "days"),
|
||||
"semesters": (18, "weeks"),
|
||||
"fortnights": (2, "weeks"),
|
||||
"ke": (864, "seconds")
|
||||
}
|
||||
|
||||
def rpartfor(u):
|
||||
if u[0][-1] == "s":
|
||||
l = [u[0] + "?"]
|
||||
l.extend(u[1:])
|
||||
else: l = u
|
||||
return f"(?:(?P<{u[0]}>{number})(?:{'|'.join(l)}))?"
|
||||
|
||||
short_timedelta_regex = re.compile("\n".join(map(rpartfor, time_units)), re.VERBOSE)
|
||||
|
||||
def parse_prefixed(s):
|
||||
match = re.match(number, s)
|
||||
if not match: raise ValueError("does not match metric-prefixed integer format - ensure prefix is valid")
|
||||
num = float(match.group(1))
|
||||
prefix = match.group(2)
|
||||
if prefix: num *= (10 ** prefixes[prefix])
|
||||
return num
|
||||
|
||||
def parse_short_timedelta(text):
|
||||
match = short_timedelta_regex.fullmatch(text)
|
||||
if match is None or not match.group(0): raise ValueError("parse failed")
|
||||
data = { k: parse_prefixed(v) if v else 0 for k, v in match.groupdict().items() }
|
||||
for tu, mapping in tu_mappings.items():
|
||||
if callable(mapping): mapping = mapping()
|
||||
qty, resunit = mapping
|
||||
data[resunit] += qty * data[tu]
|
||||
del data[tu]
|
||||
return datetime.datetime.now(tz=datetime.timezone.utc) + relativedelta(**data)
|
||||
|
||||
cal = parsedatetime.Calendar()
|
||||
def parse_humantime(text):
|
||||
dt_tuple = cal.nlp(text)
|
||||
if dt_tuple: return dt_tuple[0][0].replace(tzinfo=datetime.timezone.utc)
|
||||
else: raise ValueError("parse failed")
|
||||
|
||||
def parse_time(text):
|
||||
try: return datetime.datetime.strptime(text, "%d/%m/%Y").replace(tzinfo=datetime.timezone.utc)
|
||||
except: pass
|
||||
try: return parse_short_timedelta(text)
|
||||
except: pass
|
||||
try: return parse_humantime(text)
|
||||
except: pass
|
||||
raise ValueError("time matches no available format")
|
||||
|
||||
def format_time(dt):
|
||||
return dt.strftime("%H:%M:%S %d/%m/%Y")
|
||||
|
||||
timeparts = (
|
||||
("y", "years"),
|
||||
("mo", "months"),
|
||||
("d", "days"),
|
||||
("h", "hours"),
|
||||
("m", "minutes"),
|
||||
("s", "seconds")
|
||||
)
|
||||
|
||||
def format_timedelta(from_, to):
|
||||
d = relativedelta(to, from_)
|
||||
out = ""
|
||||
for short, attr in timeparts:
|
||||
x = getattr(d, attr)
|
||||
if x != 0: out += str(x) + short
|
||||
return "0s" if out == "" else out
|
||||
|
||||
CODEBLOCK_REGEX = "^[^`]*```[a-zA-Z0-9_\-+]*\n(.+)```$"
|
||||
CODELINE_REGEX = "^[^`]*`(.*)`$"
|
||||
def extract_codeblock(s):
|
||||
match1 = re.match(CODEBLOCK_REGEX, s, flags=re.DOTALL)
|
||||
match2 = re.match(CODELINE_REGEX, s, flags=re.DOTALL)
|
||||
if match1: return match1.group(1)
|
||||
elif match2: return match2.group(1)
|
||||
else: return s.strip()
|
||||
|
||||
# from https://github.com/Gorialis/jishaku/blob/master/jishaku/repl/compilation.py
|
||||
CORO_CODE = """
|
||||
async def repl_coroutine():
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
"""
|
||||
async def async_exec(code, loc, glob):
|
||||
user_code = ast.parse(code, mode='exec')
|
||||
wrapper = ast.parse(CORO_CODE, mode='exec')
|
||||
funcdef = wrapper.body[-1]
|
||||
funcdef.body.extend(user_code.body)
|
||||
last_expr = funcdef.body[-1]
|
||||
|
||||
if isinstance(last_expr, ast.Expr):
|
||||
funcdef.body.pop()
|
||||
funcdef.body.append(ast.Return(last_expr.value))
|
||||
ast.fix_missing_locations(wrapper)
|
||||
|
||||
exec(compile(wrapper, "<repl>", "exec"), loc, glob)
|
||||
return await (loc.get("repl_coroutine") or glob.get("repl_coroutine"))()
|
||||
|
||||
def make_embed(*, fields=(), footer_text=None, **kwargs):
|
||||
embed = discord.Embed(**kwargs)
|
||||
for field in fields:
|
||||
if len(field) > 2:
|
||||
embed.add_field(name=field[0], value=field[1], inline=field[2])
|
||||
else:
|
||||
embed.add_field(name=field[0], value=field[1], inline=False)
|
||||
if footer_text:
|
||||
embed.set_footer(text=footer_text)
|
||||
return embed
|
||||
|
||||
def error_embed(msg, title="Error"): return make_embed(color=config["colors"]["error"], description=msg, title=title)
|
||||
def info_embed(title, msg, fields=()): return make_embed(color=config["colors"]["info"], description=msg, title=title, fields=fields)
|
||||
|
||||
# https://github.com/LyricLy/Esobot/blob/bcc9e548c84ea9b23fc832d0b0aaa8288de64886/cogs/general.py
|
||||
lyrictable_raw = {
|
||||
"a": "а",
|
||||
"c": "с",
|
||||
"e": "е",
|
||||
"s": "ѕ",
|
||||
"i": "і",
|
||||
"j": "ј",
|
||||
"o": "о",
|
||||
"p": "р",
|
||||
"y": "у",
|
||||
"x": "х"
|
||||
}
|
||||
lyrictable = str.maketrans({v: k for k, v in lyrictable_raw.items()})
|
||||
|
||||
apioprefixes = ["cryo", "meta", "chrono", "contra", "ortho", "macro", "micro", "apeiro", "Ægypto", "equi", "anglo", "atto", "auro", "Australo"
|
||||
"dys", "eu", "femto", "giga", "infra", "Inver", "kilo", "meso", "mono", "nano", "neo", "omni", "pico", "proto", "pseudo", "semi", "quasi",
|
||||
"Scando", "silico", "sub", "hyper", "super", "tauto", "topo", "trans", "ultra", "uni", "ur-", "yocto", "zepto", "zetta"]
|
||||
apioinfixes = ["cryo", "pyro", "chrono", "meta", "anarcho", "arachno", "aqua", "accelero", "hydro", "radio", "xeno", "morto", "thanato", "memeto",
|
||||
"contra", "umbra", "macrono", "acantho", "acousto", "aceto", "acro", "aeolo", "hexa", "aero", "aesthio", "agro", "ferro", "alumino",
|
||||
"ammonio", "anti", "ankylo", "aniso", "annulo", "apo", "abio", "archeo", "argento", "arseno", "arithmo", "astro", "atlo", "auto", "axo",
|
||||
"azido", "bacillo", "bario", "balneo", "baryo", "basi", "benzo", "bismuto", "boreo", "biblio", "spatio", "boro", "bromo", "brachio",
|
||||
"bryo", "bronto", "calci", "caco", "carbo", "cardio", "cata", "iso", "centi", "ceno", "centro", "cero", "chalco", "chemo", "chloro",
|
||||
"chiono", "choano", "choro", "chromato", "chromo", "chryso", "chylo", "cine", "circum", "cirro", "climo", "cobalti", "coeno", "conico",
|
||||
"cono", "cortico", "cosmo", "crypto", "crano", "crystallo", "cyano", "cyber", "cyclo", "deca", "dendro", "cyno", "dactylo", "poly", "deutero",
|
||||
"dia", "digi", "diplo", "docosa", "disto", "dromo", "duo", "dynamo", "econo", "ecclesio", "echino", "eco", "ecto", "electro", "eigen", "eka",
|
||||
"elasto", "eicosa", "enviro", "enantio", "endo", "exo", "oeno", "femto", "ergato", "ergo", "etho", "euryo", "extro", "fluoro", "fructo",
|
||||
"galacto", "galvano", "glacio", "gibi", "glosso", "gluco", "glyco", "grammatico", "grapho", "gravi", "gyro", "hadro", "halo", "hapto", "hecto",
|
||||
"heli", "helio", "helico", "historio", "holo", "hella", "hemi", "hepta", "herpeto", "hiero", "hippo", "homo", "hoplo", "horo", "hyalo", "hyeto",
|
||||
"hygro", "hylo", "hypho", "hypno", "hypso", "iatro", "icthyo", "ichno", "icosa", "ideo", "idio", "imido", "info", "infra", "insta", "inter",
|
||||
"intro", "iodo", "iono", "irid", "iri", "iridio", "kilo", "diago", "juxta", "juridico", "bureaucrato", "entropo", "karyo", "kineto", "klepto",
|
||||
"konio", "kymo", "lamino", "leipdo", "lepto", "levo", "dextro", "lexico", "cognito", "ligno", "limno", "lipo", "litho", "logo", "magneto",
|
||||
"magnesio", "mega", "mento", "mercurio", "metallo", "mechano", "meco", "medio", "melo", "mero", "meso", "meteoro", "metro", "micto",
|
||||
"mono", "miso", "mnemo", "morpho", "myco", "myo", "myria", "mytho", "nano", "necro", "neo", "neutro", "neuro", "nitro", "nycto", "nucleo",
|
||||
"narco", "noto", "octo", "ochlo", "odonto", "oculo", "oligo", "opto", "organo", "ornitho", "osmio", "oneiro", "onto", "oxalo", "pachy",
|
||||
"paleo", "pali", "pallado", "pano", "para", "penta", "per", "patho", "pebi", "peloro", "pene", "petro", "pharma", "pheno", "philo", "pico",
|
||||
"piezo", "phono", "photo", "phospho", "physio", "physico", "phyto", "pico", "post", "pisci", "placo", "platy", "pleo", "plumbo", "pluto",
|
||||
"pneumato", "politico", "proto", "potassio", "proteo", "pseudo", "psycho", "ptero", "pykno", "quasi", "quadri", "recti", "retino", "retro",
|
||||
"rheo", "rhino", "rhizo", "rhodo", "roto", "rutheno", "saccharo", "sapo", "sauro", "seismo", "seleno", "septa", "silico", "scoto", "semanto",
|
||||
"sialo", "socio", "sodio", "skeleto", "somato", "somno", "sono", "spectro", "speleo", "sphero", "spino", "spiro", "sporo", "stanno", "stato",
|
||||
"steno", "stereo", "stegano", "strato", "hyper", "sulpho", "telluro", "stygo", "tachy", "tauto", "taxo", "techno", "tecto", "tele", "teleo",
|
||||
"temporo", "tera", "tetra", "thalasso", "thaumato", "thermo", "tephro", "tessera", "thio", "titano", "tomo", "topo", "tono", "tungsto",
|
||||
"turbo", "tyranno", "ultra", "undeca", "tribo", "trito", "tropho", "tropo", "uni", "urano", "video", "viro", "visuo", "xantho", "xenna",
|
||||
"xeri", "xipho", "xylo", "xyro", "yocto", "yttro", "zepto", "zetta", "zinco", "zirco", "zoo", "zono", "zygo", "templateo", "rustaceo", "mnesto",
|
||||
"amnesto", "cetaceo", "anthropo", "ioctlo"]
|
||||
apiosuffixes = ["hazard", "form"]
|
||||
|
||||
def apioform():
|
||||
out = ""
|
||||
if random.randint(0, 4) == 0:
|
||||
out += random.choice(apioprefixes)
|
||||
out += "apio"
|
||||
i = 1
|
||||
while True:
|
||||
out += random.choice(apioinfixes)
|
||||
if random.randint(0, i) > 0: break
|
||||
i *= 2
|
||||
out += random.choice(apiosuffixes)
|
||||
return out
|
||||
|
||||
def unlyric(text):
|
||||
return text.translate(lyrictable).replace("\u200b", "")
|
||||
|
||||
def gen_codeblock(content):
|
||||
return "```\n" + content.replace("```", "\\`\\`\\`")[:1900] + "\n```"
|
||||
|
||||
def json_encode(x): return json.dumps(x, separators=(',', ':'))
|
||||
|
||||
async def server_mod_check(ctx):
|
||||
return ctx.author.permissions_in(ctx.channel).manage_channels or (await ctx.bot.is_owner(ctx.author))
|
||||
|
||||
async def admin_check(ctx):
|
||||
return await ctx.bot.is_owner(ctx.author)
|
||||
|
||||
async def get_asset(bot: commands.Bot, identifier):
|
||||
safe_ident = re.sub("[^A-Za-z0-9_.-]", "_", identifier)
|
||||
x = await bot.database.execute_fetchone("SELECT * FROM assets WHERE identifier = ?", (safe_ident,))
|
||||
if x:
|
||||
return x["url"]
|
||||
file = discord.File(os.path.join("./assets", identifier), filename=safe_ident)
|
||||
message = await (bot.get_channel(config["image_upload_channel"])).send(identifier, file=file)
|
||||
url = message.attachments[0].proxy_url
|
||||
await bot.database.execute("INSERT INTO assets VALUES (?, ?)", (safe_ident, url))
|
||||
return url
|
||||
|
||||
def hashbow(thing):
|
||||
return int.from_bytes(hashlib.blake2b(thing.encode("utf-8")).digest()[:3], "little")
|
||||
|
||||
extensions = (
|
||||
"reminders",
|
||||
"debug",
|
||||
"telephone",
|
||||
"achievement",
|
||||
"heavserver",
|
||||
"voice"
|
||||
)
|
49
src/voice.py
49
src/voice.py
|
@ -1,49 +0,0 @@
|
|||
import util
|
||||
import discord
|
||||
from discord.oggparse import OggStream
|
||||
# requests is for synchronous HTTP. This would be quite awful to use in the rest of the code, but is probably okay since voice code is in a separate thread
|
||||
# This should, arguably, run in a separate process given python's GIL etc. but this would involve significant effort probably
|
||||
import requests
|
||||
import io
|
||||
from discord.ext import commands
|
||||
|
||||
|
||||
class HTTPSource(discord.AudioSource):
|
||||
def __init__(self, url):
|
||||
self.url = url
|
||||
async def start(self):
|
||||
bytestream = requests.get(self.url, stream=True).raw
|
||||
self.packets = OggStream(io.BufferedReader(bytestream, buffer_size=2**10)).iter_packets()
|
||||
def read(self): return next(self.packets, b"")
|
||||
def is_opus(self): return True
|
||||
|
||||
def setup(bot):
|
||||
# experimental, thus limit to me only
|
||||
@bot.group()
|
||||
@commands.check(util.admin_check)
|
||||
async def radio(ctx): pass
|
||||
|
||||
@radio.command()
|
||||
async def connect(ctx, thing="main"):
|
||||
voice = ctx.author.voice
|
||||
if not voice: return await ctx.send(embed=util.error_embed("You are not in a voice channel."))
|
||||
if voice.mute: return await ctx.send(embed=util.error_embed("You are muted."))
|
||||
thing_url = util.config["radio_urls"].get(thing, None)
|
||||
if thing_url == None: return await ctx.send(embed=util.error_embed("No such radio thing."))
|
||||
existing = ctx.guild.voice_client
|
||||
if existing: await existing.disconnect()
|
||||
vc = await voice.channel.connect()
|
||||
src = HTTPSource(thing_url)
|
||||
await src.start()
|
||||
vc.play(src)
|
||||
|
||||
@radio.command()
|
||||
async def disconnect(ctx):
|
||||
if ctx.guild.voice_client:
|
||||
ctx.guild.voice_client.stop()
|
||||
await ctx.guild.voice_client.disconnect()
|
||||
|
||||
def teardown(bot):
|
||||
for guild in bot.guilds:
|
||||
if guild.voice_client:
|
||||
guild.voice_client.stop()
|
Loading…
Reference in New Issue
Block a user