1
0
mirror of https://github.com/osmarks/autobotrobot synced 2025-02-12 06:40:10 +00:00

improve, deliver, create, develop, design, operate, manage, produce, modernize, complicate, nationalize, placate, evolve, alter, amend, change or obliterate code

This commit is contained in:
osmarks 2021-05-31 20:14:04 +01:00
parent c1b5007d33
commit d9c7df8d23
5 changed files with 55 additions and 49 deletions

View File

@ -45,7 +45,7 @@ async def achieve(bot: commands.Bot, message: discord.Message, achievement):
metrics.achievements_achieved.inc() metrics.achievements_achieved.inc()
await bot.database.execute("INSERT INTO achievements VALUES (?, ?, ?)", (uid, achievement, util.timestamp())) await bot.database.execute("INSERT INTO achievements VALUES (?, ?, ?)", (uid, achievement, util.timestamp()))
await bot.database.commit() await bot.database.commit()
logging.info("Awarded achievement %s to %s", message.author.name, achievement) logging.info("Awarded achievement %s to %s", achievement, message.author.name)
def setup(bot): def setup(bot):
@bot.group(name="achievements", aliases=["ach", "achieve", "achievement"], brief="Achieve a wide variety of fun achievements!", help=f""" @bot.group(name="achievements", aliases=["ach", "achieve", "achievement"], brief="Achieve a wide variety of fun achievements!", help=f"""

View File

@ -1,53 +1,59 @@
import subprocess
import asyncio import asyncio
import argparse import argparse
import random import random
from numpy.random import default_rng from numpy.random import default_rng
import re import re
import discord.ext.commands import aiohttp
import discord.ext.commands as commands
import tio import tio
import util import util
def setup(bot): cleaner = commands.clean_content()
cleaner = discord.ext.commands.clean_content() def clean(ctx, text):
def clean(ctx, text): return cleaner.convert(self, ctx, text)
return cleaner.convert(ctx, text)
@bot.command(help="Gives you a random fortune as generated by `fortune`.") class GeneralCommands(commands.Cog):
async def fortune(ctx): def __init__(self, bot):
await ctx.send(subprocess.run(["fortune"], stdout=subprocess.PIPE, encoding="UTF-8").stdout) self.bot = bot
self.session = aiohttp.ClientSession()
@bot.command(help="Generates an apioform type.") @commands.command(help="Gives you a random fortune as generated by `fortune`.")
async def apioform(ctx): async def fortune(self, ctx):
proc = await asyncio.create_subprocess_exec("fortune", stdout=subprocess.PIPE)
stdout = (await proc.communicate()).decode("utf-8")
await ctx.send(stdout)
@commands.command(help="Generates an apioform type.")
async def apioform(self, ctx):
await ctx.send(util.apioform()) await ctx.send(util.apioform())
@bot.command(help="Says Pong.") @commands.command(help="Says Pong.")
async def ping(ctx): async def ping(self, ctx):
await ctx.send("Pong.") await ctx.send("Pong.")
@bot.command(help="Deletes the specified target.", rest_is_raw=True) @commands.command(help="Deletes the specified target.", rest_is_raw=True)
async def delete(ctx, *, raw_target): async def delete(self, ctx, *, raw_target):
target = await clean(ctx, raw_target.strip().replace("\n", " ")) target = await clean(self, ctx, raw_target.strip().replace("\n", " "))
if len(target) > 256: if len(target) > 256:
await ctx.send(embed=util.error_embed("Deletion target must be max 256 chars")) await ctx.send(embed=util.error_embed("Deletion target must be max 256 chars"))
return return
async with ctx.typing(): async with ctx.typing():
await ctx.send(f"Deleting {target}...") await ctx.send(f"Deleting {target}...")
await asyncio.sleep(1) await asyncio.sleep(1)
await bot.database.execute("INSERT INTO deleted_items (timestamp, item) VALUES (?, ?)", (util.timestamp(), target)) await self.bot.database.execute("INSERT INTO deleted_items (timestamp, item) VALUES (?, ?)", (util.timestamp(), target))
await bot.database.commit() await self.bot.database.commit()
await ctx.send(f"Deleted {target} successfully.") await ctx.send(f"Deleted {target} successfully.")
@bot.command(help="View recently deleted things, optionally matching a filter.") @commands.command(help="View recently deleted things, optionally matching a filter.")
async def list_deleted(ctx, search=None): async def list_deleted(self, ctx, search=None):
acc = "Recently deleted:\n" acc = "Recently deleted:\n"
if search: acc = f"Recently deleted (matching {search}):\n" if search: acc = f"Recently deleted (matching {search}):\n"
csr = None csr = None
if search: if search:
csr = bot.database.execute("SELECT * FROM deleted_items WHERE item LIKE ? ORDER BY timestamp DESC LIMIT 100", (f"%{search}%",)) csr = self.bot.database.execute("SELECT * FROM deleted_items WHERE item LIKE ? ORDER BY timestamp DESC LIMIT 100", (f"%{search}%",))
else: else:
csr = bot.database.execute("SELECT * FROM deleted_items ORDER BY timestamp DESC LIMIT 100") csr = self.bot.database.execute("SELECT * FROM deleted_items ORDER BY timestamp DESC LIMIT 100")
async with csr as cursor: async with csr as cursor:
async for row in cursor: async for row in cursor:
to_add = "- " + row[2].replace("```", "[REDACTED]") + "\n" to_add = "- " + row[2].replace("```", "[REDACTED]") + "\n"
@ -70,8 +76,8 @@ def setup(bot):
exec_flag_parser.add_argument("--verbose", "-v", action="store_true") exec_flag_parser.add_argument("--verbose", "-v", action="store_true")
exec_flag_parser.add_argument("--language", "-L") exec_flag_parser.add_argument("--language", "-L")
@bot.command(rest_is_raw=True, help="Execute provided code (in a codeblock) using TIO.run.") @commands.command(rest_is_raw=True, help="Execute provided code (in a codeblock) using TIO.run.")
async def exec(ctx, *, arg): async def exec(self, ctx, *, arg):
match = re.match(EXEC_REGEX, arg, flags=re.DOTALL) match = re.match(EXEC_REGEX, arg, flags=re.DOTALL)
if match == None: if match == None:
await ctx.send(embed=util.error_embed("Invalid format. Expected a codeblock.")) await ctx.send(embed=util.error_embed("Invalid format. Expected a codeblock."))
@ -86,7 +92,7 @@ def setup(bot):
code = match.group(3) code = match.group(3)
async with ctx.typing(): async with ctx.typing():
ok, real_lang, result, debug = await tio.run(lang, code) ok, real_lang, result, debug = await tio.run(self.session, lang, code)
if not ok: if not ok:
await ctx.send(embed=util.error_embed(util.gen_codeblock(result), "Execution failed")) await ctx.send(embed=util.error_embed(util.gen_codeblock(result), "Execution failed"))
else: else:
@ -98,9 +104,9 @@ def setup(bot):
out = out[:2000] out = out[:2000]
await ctx.send(out) await ctx.send(out)
@bot.command(help="List supported languages, optionally matching a filter.") @commands.command(help="List supported languages, optionally matching a filter.")
async def supported_langs(ctx, search=None): async def supported_langs(self, ctx, search=None):
langs = sorted(tio.languages()) langs = sorted(await tio.languages(self.session))
acc = "" acc = ""
for lang in langs: for lang in langs:
if len(acc + lang) > 2000: if len(acc + lang) > 2000:
@ -110,18 +116,18 @@ def setup(bot):
if acc == "": acc = "No results." if acc == "": acc = "No results."
await ctx.send(acc) await ctx.send(acc)
@bot.command(help="Get some information about the bot.", aliases=["invite"]) @commands.command(help="Get some information about the bot.", aliases=["invite"])
async def about(ctx): async def about(self, ctx):
await ctx.send("""**AutoBotRobot: The least useful Discord bot ever designed.** await ctx.send("""**AutoBotRobot: The least useful Discord bot ever designed.**
AutoBotRobot has many features, but not necessarily any practical ones. AutoBotRobot has many features, but not necessarily any practical ones.
It can execute code via TIO.run, do reminders, print fortunes, and not any more! It can execute code via TIO.run, do reminders, print fortunes, bridge IRC, store data, and search things!
AutoBotRobot is open source - the code is available at <https://github.com/osmarks/autobotrobot> - and you could run your own instance if you wanted to and could get around the complete lack of user guide or documentation. AutoBotRobot is open source - the code is available at <https://github.com/osmarks/AutoBotRobot> - and you could run your own instance if you wanted to and could get around the complete lack of user guide or documentation.
You can also invite it to your server: <https://discordapp.com/oauth2/authorize?&client_id=509849474647064576&scope=bot&permissions=68608> You can also invite it to your server: <https://discordapp.com/oauth2/authorize?&client_id=509849474647064576&scope=bot&permissions=68608>
AutoBotRobot is operated by gollark/osmarks. AutoBotRobot is operated by gollark/osmarks.
""") """)
@bot.command(help="Roll simulated dice (basic NdX syntax, N <= 50, X <= 1e6).") @commands.command(help="Roll simulated dice (basic NdX syntax, N <= 50, X <= 1e6).")
async def roll(ctx, dice): async def roll(self, ctx, dice):
match = re.match("([-0-9]*)d([0-9]+)", dice) match = re.match("([-0-9]*)d([0-9]+)", dice)
if not match: raise ValueError("Invalid dice notation") if not match: raise ValueError("Invalid dice notation")
n, x = match.groups() n, x = match.groups()
@ -145,8 +151,8 @@ AutoBotRobot is operated by gollark/osmarks.
rng = default_rng() rng = default_rng()
@bot.command(help="'Randomly' choose between the specified options.", name="choice", aliases=["choose"]) @commands.command(help="'Randomly' choose between the specified options.", name="choice", aliases=["choose"])
async def random_choice(ctx, *choices): async def random_choice(self, ctx, *choices):
choices = list(choices) choices = list(choices)
samples = 1 samples = 1
# apparently doing typing.Optional[int] doesn't work properly with this, so just bodge around it # apparently doing typing.Optional[int] doesn't work properly with this, so just bodge around it
@ -169,3 +175,6 @@ AutoBotRobot is operated by gollark/osmarks.
results = map(lambda t: (choices[t[0]], t[1]), enumerate(rng.multinomial(samples, list(probabilities)))) results = map(lambda t: (choices[t[0]], t[1]), enumerate(rng.multinomial(samples, list(probabilities))))
await ctx.send("\n".join(map(lambda x: f"{x[0]} x{x[1]}", results))) await ctx.send("\n".join(map(lambda x: f"{x[0]} x{x[1]}", results)))
def setup(bot):
bot.add_cog(GeneralCommands(bot))

View File

@ -63,7 +63,7 @@ async def on_command_error(ctx, err):
await ctx.send(embed=util.error_embed(util.gen_codeblock(trace), title=f"Internal error in {ctx.invoked_with}")) await ctx.send(embed=util.error_embed(util.gen_codeblock(trace), title=f"Internal error in {ctx.invoked_with}"))
await achievement.achieve(ctx.bot, ctx.message, "error") await achievement.achieve(ctx.bot, ctx.message, "error")
except Exception as e: except Exception as e:
logging.exception("Error in error handling!", e) logging.exception("Error in command error handling.")
@bot.check @bot.check
async def andrew_bad(ctx): async def andrew_bad(ctx):
@ -107,8 +107,9 @@ async def run_bot():
if __name__ == "__main__": if __name__ == "__main__":
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.create_task(prometheus_async.aio.web.start_http_server(port=config["metrics_port"])) loop.create_task(prometheus_async.aio.web.start_http_server(port=config["metrics_port"]))
loop.create_task(run_bot())
try: try:
loop.run_until_complete(run_bot()) loop.run_forever()
except KeyboardInterrupt: except KeyboardInterrupt:
loop.run_until_complete(bot.logout()) loop.run_until_complete(bot.logout())
sys.exit(0) sys.exit(0)

View File

@ -1,25 +1,21 @@
import pytio import pytio
import http3
import gzip import gzip
import io import io
tio = pytio.Tio() tio = pytio.Tio()
def languages(): async def languages(http_session):
return tio.query_languages() return await (await http_session.get("https://tio.run/languages.json")).json()
aliases = { aliases = {
"python": "python3", "python": "python3",
"javascript": "javascript-node" "javascript": "javascript-node"
} }
client = http3.AsyncClient() async def run(http_session, lang, code):
async def run(lang, code):
real_lang = aliases.get(lang, lang) real_lang = aliases.get(lang, lang)
req = pytio.TioRequest(real_lang, code) req = pytio.TioRequest(real_lang, code)
res = await client.post("https://tio.run/cgi-bin/run/api/", data=req.as_deflated_bytes(), timeout=65) res = await (await http_session.post("https://tio.run/cgi-bin/run/api/", data=req.as_deflated_bytes(), timeout=65)).text()
content = res.content.decode("UTF-8")
split = list(filter(lambda x: x != "\n" and x != "", content.split(content[:16]))) split = list(filter(lambda x: x != "\n" and x != "", content.split(content[:16])))
if len(split) == 1: if len(split) == 1:
return False, real_lang, split[0], None return False, real_lang, split[0], None

View File

@ -18,7 +18,7 @@ config = {}
# update in place for runtime config reload # update in place for runtime config reload
def load_config(): def load_config():
for k, v in toml.load(open("config.toml", "r")).items(): config[k] = v for k, v in toml.load(open(os.path.join(os.path.dirname(__file__), "../config.toml"), "r")).items(): config[k] = v
load_config() load_config()