autobotrobot/src/main.py

153 lines
5.5 KiB
Python

import discord
import toml
import logging
import subprocess
import discord.ext.commands as commands
import re
import asyncio
import json
import argparse
from datetime import timezone, datetime
import tio
import db
def timestamp(): return int(datetime.now(tz=timezone.utc).timestamp())
# TODO refactor this
database = None
config = toml.load(open("config.toml", "r"))
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(asctime)s %(message)s", datefmt="%H:%M:%S %d/%m/%Y")
bot = commands.Bot(command_prefix='++', description="AutoBotRobot, the most useless bot in the known universe.", case_insensitive=True)
bot._skip_check = lambda x, y: False
def make_embed(*, fields=[], footer_text=None, **kwargs):
embed = discord.Embed(**kwargs)
for field in fields:
if len(field) > 2:
embed.add_field(name=field[0], value=field[1], inline=field[2])
else:
embed.add_field(name=field[0], value=field[1], inline=False)
if footer_text:
embed.set_footer(text=footer_text)
return embed
def error_embed(msg, title="Error"):
return make_embed(color=config["colors"]["error"], description=msg, title=title)
cleaner = discord.ext.commands.clean_content()
def clean(ctx, text):
return cleaner.convert(ctx, text)
@bot.event
async def on_ready():
await bot.change_presence(status=discord.Status.online, activity=discord.Activity(type=discord.ActivityType.listening, name="commands beginning with ++"))
@bot.command(help="Gives you a random fortune as generated by `fortune`.")
async def fortune(ctx):
await ctx.send(subprocess.run(["fortune"], stdout=subprocess.PIPE, encoding="UTF-8").stdout)
@bot.command(help="Says Pong.")
async def ping(ctx):
await ctx.send("Pong.")
@bot.command(help="Deletes the specified target.", rest_is_raw=True)
async def delete(ctx, *, raw_target):
target = await clean(ctx, raw_target.strip().replace("\n", " "))
if len(target) > 256:
await ctx.send(embed=error_embed("Deletion target must be max 256 chars"))
return
async with ctx.typing():
await ctx.send(f"Deleting {target}...")
await asyncio.sleep(1)
await database.execute("INSERT INTO deleted_items (timestamp, item) VALUES (?, ?)", (timestamp(), target))
await database.commit()
await ctx.send(f"Deleted {target} successfully.")
@bot.command(help="View recently deleted things, optionally matching a filter.")
async def list_deleted(ctx, search=None):
acc = "Recently deleted:\n"
if search: acc = f"Recently deleted (matching {search}):\n"
csr = None
if search:
csr = database.execute("SELECT * FROM deleted_items WHERE item LIKE ? ORDER BY timestamp DESC LIMIT 100", (f"%{search}%",))
else:
csr = database.execute("SELECT * FROM deleted_items ORDER BY timestamp DESC LIMIT 100")
async with csr as cursor:
async for row in cursor:
to_add = "- " + row[2] + "\n"
if len(acc + to_add) > 2000:
break
acc += to_add
await ctx.send(acc)
# Python, for some *very intelligent reason*, makes the default ArgumetParser exit the program on error.
# This is obviously undesirable behavior in a Discord bot, so we override this.
class NonExitingArgumentParser(argparse.ArgumentParser):
def exit(self, status=0, message=None):
if status:
raise Exception(f'Flag parse error: {message}')
exit(status)
EXEC_REGEX = "^(.*)```([a-zA-Z0-9_\\-+]+)?\n(.*)```$"
exec_flag_parser = NonExitingArgumentParser(add_help=False)
exec_flag_parser.add_argument("--verbose", "-v", action="store_true")
exec_flag_parser.add_argument("--language", "-L")
@bot.command(rest_is_raw=True, help="Execute provided code (in a codeblock) using TIO.run.")
async def exec(ctx, *, arg):
match = re.match(EXEC_REGEX, arg, flags=re.DOTALL)
if match == None:
await ctx.send(embed=error_embed("Invalid format. Expected a codeblock."))
return
flags_raw = match.group(1)
flags = exec_flag_parser.parse_args(flags_raw.split())
lang = flags.language or match.group(2)
if not lang:
await ctx.send(embed=error_embed("No language specified. Use the -L flag or add a language to your codeblock."))
return
lang = lang.strip()
code = match.group(3)
async with ctx.typing():
ok, real_lang, result, debug = await tio.run(lang, code)
if not ok:
await ctx.send(embed=error_embed(f"""```{result}```""", "Execution error"))
else:
out = result
if flags.verbose:
debug_block = f"""\n```{debug}\nLanguage: {real_lang}```"""
out = out[:2000 - len(debug_block)] + debug_block
else:
out = out[:2000]
await ctx.send(out)
@bot.command(help="List supported languages, optionally matching a filter.")
async def supported_langs(ctx, search=None):
langs = sorted(tio.languages())
acc = ""
for lang in langs:
if len(acc + lang) > 2000:
await ctx.send(acc)
acc = ""
if search == None or search in lang: acc += lang + " "
if acc == "": acc = "No results."
await ctx.send(acc)
async def run_bot():
global database
database = await db.init(config["database"])
await bot.start(config["token"])
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(run_bot())
except KeyboardInterrupt:
loop.run_until_complete(bot.logout())
finally:
loop.close()