1
0
mirror of https://github.com/osmarks/autobotrobot synced 2024-06-22 21:13:17 +00:00
autobotrobot/src/main.py

198 lines
7.5 KiB
Python
Raw Normal View History

2019-10-21 20:07:17 +00:00
import discord
import toml
import logging
import subprocess
import discord.ext.commands as commands
2020-05-12 16:08:03 +00:00
import discord.ext.tasks as tasks
2019-10-21 20:07:17 +00:00
import re
import asyncio
import json
2020-04-18 12:14:31 +00:00
import argparse
from datetime import timezone, datetime
2019-10-21 20:07:17 +00:00
import tio
2020-04-18 12:14:31 +00:00
import db
2020-05-12 16:08:03 +00:00
import util
2019-10-21 20:07:17 +00:00
2020-04-18 12:14:31 +00:00
def timestamp(): return int(datetime.now(tz=timezone.utc).timestamp())
2019-10-21 20:07:17 +00:00
2020-04-18 12:14:31 +00:00
# TODO refactor this
database = None
2019-10-21 20:07:17 +00:00
2020-04-18 12:14:31 +00:00
config = toml.load(open("config.toml", "r"))
2019-10-21 20:07:17 +00:00
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(asctime)s %(message)s", datefmt="%H:%M:%S %d/%m/%Y")
bot = commands.Bot(command_prefix='++', description="AutoBotRobot, the most useless bot in the known universe.", case_insensitive=True)
bot._skip_check = lambda x, y: False
2020-04-18 12:14:31 +00:00
def make_embed(*, fields=[], footer_text=None, **kwargs):
embed = discord.Embed(**kwargs)
for field in fields:
if len(field) > 2:
embed.add_field(name=field[0], value=field[1], inline=field[2])
else:
embed.add_field(name=field[0], value=field[1], inline=False)
if footer_text:
embed.set_footer(text=footer_text)
return embed
def error_embed(msg, title="Error"):
return make_embed(color=config["colors"]["error"], description=msg, title=title)
2019-10-21 20:07:17 +00:00
cleaner = discord.ext.commands.clean_content()
def clean(ctx, text):
return cleaner.convert(ctx, text)
@bot.command(help="Gives you a random fortune as generated by `fortune`.")
async def fortune(ctx):
await ctx.send(subprocess.run(["fortune"], stdout=subprocess.PIPE, encoding="UTF-8").stdout)
@bot.command(help="Says Pong.")
async def ping(ctx):
await ctx.send("Pong.")
@bot.command(help="Deletes the specified target.", rest_is_raw=True)
async def delete(ctx, *, raw_target):
2020-04-18 12:14:31 +00:00
target = await clean(ctx, raw_target.strip().replace("\n", " "))
if len(target) > 256:
await ctx.send(embed=error_embed("Deletion target must be max 256 chars"))
return
2019-10-21 20:07:17 +00:00
async with ctx.typing():
await ctx.send(f"Deleting {target}...")
await asyncio.sleep(1)
2020-04-18 12:14:31 +00:00
await database.execute("INSERT INTO deleted_items (timestamp, item) VALUES (?, ?)", (timestamp(), target))
await database.commit()
2019-10-21 20:07:17 +00:00
await ctx.send(f"Deleted {target} successfully.")
@bot.command(help="View recently deleted things, optionally matching a filter.")
async def list_deleted(ctx, search=None):
acc = "Recently deleted:\n"
2020-04-18 12:17:31 +00:00
if search: acc = f"Recently deleted (matching {search}):\n"
2020-04-18 12:14:31 +00:00
csr = None
if search:
csr = database.execute("SELECT * FROM deleted_items WHERE item LIKE ? ORDER BY timestamp DESC LIMIT 100", (f"%{search}%",))
else:
csr = database.execute("SELECT * FROM deleted_items ORDER BY timestamp DESC LIMIT 100")
async with csr as cursor:
async for row in cursor:
to_add = "- " + row[2] + "\n"
if len(acc + to_add) > 2000:
break
acc += to_add
2019-10-21 20:07:17 +00:00
await ctx.send(acc)
2020-04-18 12:14:31 +00:00
# Python, for some *very intelligent reason*, makes the default ArgumetParser exit the program on error.
# This is obviously undesirable behavior in a Discord bot, so we override this.
class NonExitingArgumentParser(argparse.ArgumentParser):
def exit(self, status=0, message=None):
if status:
raise Exception(f'Flag parse error: {message}')
exit(status)
2019-10-21 20:07:17 +00:00
2020-04-18 12:14:31 +00:00
EXEC_REGEX = "^(.*)```([a-zA-Z0-9_\\-+]+)?\n(.*)```$"
2019-10-21 20:07:17 +00:00
2020-04-18 12:14:31 +00:00
exec_flag_parser = NonExitingArgumentParser(add_help=False)
exec_flag_parser.add_argument("--verbose", "-v", action="store_true")
exec_flag_parser.add_argument("--language", "-L")
2019-10-21 20:07:17 +00:00
@bot.command(rest_is_raw=True, help="Execute provided code (in a codeblock) using TIO.run.")
async def exec(ctx, *, arg):
match = re.match(EXEC_REGEX, arg, flags=re.DOTALL)
if match == None:
2020-04-18 12:14:31 +00:00
await ctx.send(embed=error_embed("Invalid format. Expected a codeblock."))
return
flags_raw = match.group(1)
flags = exec_flag_parser.parse_args(flags_raw.split())
lang = flags.language or match.group(2)
if not lang:
await ctx.send(embed=error_embed("No language specified. Use the -L flag or add a language to your codeblock."))
2019-10-21 20:07:17 +00:00
return
2020-04-18 12:14:31 +00:00
lang = lang.strip()
2019-10-21 20:07:17 +00:00
code = match.group(3)
async with ctx.typing():
2020-04-18 12:14:31 +00:00
ok, real_lang, result, debug = await tio.run(lang, code)
2019-10-21 20:07:17 +00:00
if not ok:
2020-04-18 12:14:31 +00:00
await ctx.send(embed=error_embed(f"""```{result}```""", "Execution error"))
2019-10-21 20:07:17 +00:00
else:
out = result
2020-04-18 12:14:31 +00:00
if flags.verbose:
debug_block = f"""\n```{debug}\nLanguage: {real_lang}```"""
out = out[:2000 - len(debug_block)] + debug_block
else:
out = out[:2000]
2019-10-21 20:07:17 +00:00
await ctx.send(out)
@bot.command(help="List supported languages, optionally matching a filter.")
async def supported_langs(ctx, search=None):
langs = sorted(tio.languages())
acc = ""
for lang in langs:
if len(acc + lang) > 2000:
await ctx.send(acc)
acc = ""
if search == None or search in lang: acc += lang + " "
2020-04-18 12:14:31 +00:00
if acc == "": acc = "No results."
2019-10-21 20:07:17 +00:00
await ctx.send(acc)
2020-05-12 16:08:03 +00:00
@bot.command(help="Set a reminder. All times are UTC. Reminders are only checked once per minute.", rest_is_raw=True)
async def remind(ctx, time, *, reminder):
reminder = reminder.strip()
if len(reminder) > 512:
await ctx.send(embed=error_embed("Maximum reminder length is 512 characters"))
return
extra_data = {
"author_id": ctx.author.id,
"channel_id": ctx.message.channel.id,
"original_time_spec": time
}
time = util.parse_time(time)
await database.execute("INSERT INTO reminders (remind_timestamp, created_timestamp, reminder, expired, extra) VALUES (?, ?, ?, ?, ?)",
(time.timestamp(), timestamp(), reminder, 0, json.dumps(extra_data)))
await database.commit()
await ctx.send(f"Reminder scheduled for {util.format_time(time)}.")
@tasks.loop(seconds=60)
async def remind_worker():
csr = database.execute("SELECT * FROM reminders WHERE expired = 0 AND remind_timestamp < ?", (timestamp(),))
to_expire = []
async with csr as cursor:
async for row in cursor:
rid, remind_timestamp, created_timestamp, reminder_text, _, extra = row
try:
remind_timestamp = datetime.utcfromtimestamp(remind_timestamp)
created_timestamp = datetime.utcfromtimestamp(created_timestamp)
extra = json.loads(extra)
uid = extra["author_id"]
text = f"<@{uid}> Reminder queued at {util.format_time(created_timestamp)}: {reminder_text}"
channel = bot.get_channel(extra["channel_id"])
await channel.send(text)
to_expire.append(rid)
except Exception as e:
logging.warning("Could not send reminder %d", rid, exc_info=e)
for expiry_id in to_expire:
logging.info("Expiring reminder %d", expiry_id)
await database.execute("UPDATE reminders SET expired = 1 WHERE id = ?", (expiry_id,))
await database.commit()
@bot.event
async def on_ready():
logging.info("Connected as " + bot.user.name)
await bot.change_presence(status=discord.Status.online, activity=discord.Activity(type=discord.ActivityType.listening, name="commands beginning with ++"))
remind_worker.start()
2020-04-18 12:14:31 +00:00
async def run_bot():
global database
database = await db.init(config["database"])
await bot.start(config["token"])
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(run_bot())
except KeyboardInterrupt:
2020-05-12 16:08:03 +00:00
remind_worker.cancel()
2020-04-18 12:14:31 +00:00
loop.run_until_complete(bot.logout())
finally:
loop.close()