2019-10-21 20:07:17 +00:00
import discord
import toml
import logging
import subprocess
import discord . ext . commands as commands
2020-05-12 16:08:03 +00:00
import discord . ext . tasks as tasks
2019-10-21 20:07:17 +00:00
import re
import asyncio
import json
2020-04-18 12:14:31 +00:00
import argparse
2020-06-18 18:43:20 +00:00
import traceback
2020-04-18 12:14:31 +00:00
from datetime import timezone , datetime
2019-10-21 20:07:17 +00:00
import tio
2020-04-18 12:14:31 +00:00
import db
2020-05-12 16:08:03 +00:00
import util
2019-10-21 20:07:17 +00:00
2020-04-18 12:14:31 +00:00
def timestamp ( ) : return int ( datetime . now ( tz = timezone . utc ) . timestamp ( ) )
2019-10-21 20:07:17 +00:00
2020-04-18 12:14:31 +00:00
# TODO refactor this
database = None
2019-10-21 20:07:17 +00:00
2020-04-18 12:14:31 +00:00
config = toml . load ( open ( " config.toml " , " r " ) )
2019-10-21 20:07:17 +00:00
logging . basicConfig ( level = logging . INFO , format = " %(levelname)s %(asctime)s %(message)s " , datefmt = " % H: % M: % S %d / % m/ % Y " )
2020-06-18 18:43:20 +00:00
bot = commands . Bot ( command_prefix = config [ " prefix " ] , description = " AutoBotRobot, the most useless bot in the known universe. " , case_insensitive = True )
2019-10-21 20:07:17 +00:00
bot . _skip_check = lambda x , y : False
2020-04-18 12:14:31 +00:00
def make_embed ( * , fields = [ ] , footer_text = None , * * kwargs ) :
embed = discord . Embed ( * * kwargs )
for field in fields :
if len ( field ) > 2 :
embed . add_field ( name = field [ 0 ] , value = field [ 1 ] , inline = field [ 2 ] )
else :
embed . add_field ( name = field [ 0 ] , value = field [ 1 ] , inline = False )
if footer_text :
embed . set_footer ( text = footer_text )
return embed
def error_embed ( msg , title = " Error " ) :
return make_embed ( color = config [ " colors " ] [ " error " ] , description = msg , title = title )
2019-10-21 20:07:17 +00:00
cleaner = discord . ext . commands . clean_content ( )
def clean ( ctx , text ) :
return cleaner . convert ( ctx , text )
@bot.command ( help = " Gives you a random fortune as generated by `fortune`. " )
async def fortune ( ctx ) :
await ctx . send ( subprocess . run ( [ " fortune " ] , stdout = subprocess . PIPE , encoding = " UTF-8 " ) . stdout )
@bot.command ( help = " Says Pong. " )
async def ping ( ctx ) :
await ctx . send ( " Pong. " )
@bot.command ( help = " Deletes the specified target. " , rest_is_raw = True )
async def delete ( ctx , * , raw_target ) :
2020-04-18 12:14:31 +00:00
target = await clean ( ctx , raw_target . strip ( ) . replace ( " \n " , " " ) )
if len ( target ) > 256 :
await ctx . send ( embed = error_embed ( " Deletion target must be max 256 chars " ) )
return
2019-10-21 20:07:17 +00:00
async with ctx . typing ( ) :
await ctx . send ( f " Deleting { target } ... " )
await asyncio . sleep ( 1 )
2020-04-18 12:14:31 +00:00
await database . execute ( " INSERT INTO deleted_items (timestamp, item) VALUES (?, ?) " , ( timestamp ( ) , target ) )
await database . commit ( )
2019-10-21 20:07:17 +00:00
await ctx . send ( f " Deleted { target } successfully. " )
@bot.command ( help = " View recently deleted things, optionally matching a filter. " )
async def list_deleted ( ctx , search = None ) :
acc = " Recently deleted: \n "
2020-04-18 12:17:31 +00:00
if search : acc = f " Recently deleted (matching { search } ): \n "
2020-04-18 12:14:31 +00:00
csr = None
if search :
csr = database . execute ( " SELECT * FROM deleted_items WHERE item LIKE ? ORDER BY timestamp DESC LIMIT 100 " , ( f " % { search } % " , ) )
else :
csr = database . execute ( " SELECT * FROM deleted_items ORDER BY timestamp DESC LIMIT 100 " )
async with csr as cursor :
async for row in cursor :
to_add = " - " + row [ 2 ] + " \n "
if len ( acc + to_add ) > 2000 :
break
acc + = to_add
2019-10-21 20:07:17 +00:00
await ctx . send ( acc )
2020-04-18 12:14:31 +00:00
# Python, for some *very intelligent reason*, makes the default ArgumetParser exit the program on error.
# This is obviously undesirable behavior in a Discord bot, so we override this.
class NonExitingArgumentParser ( argparse . ArgumentParser ) :
def exit ( self , status = 0 , message = None ) :
if status :
raise Exception ( f ' Flag parse error: { message } ' )
exit ( status )
2019-10-21 20:07:17 +00:00
2020-04-18 12:14:31 +00:00
EXEC_REGEX = " ^(.*)```([a-zA-Z0-9_ \\ -+]+)? \n (.*)```$ "
2019-10-21 20:07:17 +00:00
2020-04-18 12:14:31 +00:00
exec_flag_parser = NonExitingArgumentParser ( add_help = False )
exec_flag_parser . add_argument ( " --verbose " , " -v " , action = " store_true " )
exec_flag_parser . add_argument ( " --language " , " -L " )
2019-10-21 20:07:17 +00:00
@bot.command ( rest_is_raw = True , help = " Execute provided code (in a codeblock) using TIO.run. " )
async def exec ( ctx , * , arg ) :
match = re . match ( EXEC_REGEX , arg , flags = re . DOTALL )
if match == None :
2020-04-18 12:14:31 +00:00
await ctx . send ( embed = error_embed ( " Invalid format. Expected a codeblock. " ) )
return
flags_raw = match . group ( 1 )
flags = exec_flag_parser . parse_args ( flags_raw . split ( ) )
lang = flags . language or match . group ( 2 )
if not lang :
await ctx . send ( embed = error_embed ( " No language specified. Use the -L flag or add a language to your codeblock. " ) )
2019-10-21 20:07:17 +00:00
return
2020-04-18 12:14:31 +00:00
lang = lang . strip ( )
2019-10-21 20:07:17 +00:00
code = match . group ( 3 )
async with ctx . typing ( ) :
2020-04-18 12:14:31 +00:00
ok , real_lang , result , debug = await tio . run ( lang , code )
2019-10-21 20:07:17 +00:00
if not ok :
2020-04-18 12:14:31 +00:00
await ctx . send ( embed = error_embed ( f """ ``` { result } ``` """ , " Execution error " ) )
2019-10-21 20:07:17 +00:00
else :
out = result
2020-04-18 12:14:31 +00:00
if flags . verbose :
debug_block = f """ \n ``` { debug } \n Language: { real_lang } ``` """
out = out [ : 2000 - len ( debug_block ) ] + debug_block
else :
out = out [ : 2000 ]
2019-10-21 20:07:17 +00:00
await ctx . send ( out )
@bot.command ( help = " List supported languages, optionally matching a filter. " )
async def supported_langs ( ctx , search = None ) :
langs = sorted ( tio . languages ( ) )
acc = " "
for lang in langs :
if len ( acc + lang ) > 2000 :
await ctx . send ( acc )
acc = " "
if search == None or search in lang : acc + = lang + " "
2020-04-18 12:14:31 +00:00
if acc == " " : acc = " No results. "
2019-10-21 20:07:17 +00:00
await ctx . send ( acc )
2020-06-18 18:43:20 +00:00
@bot.command ( brief = " Set a reminder to be reminded about later. " , rest_is_raw = True , help = """ Sets a reminder which you will (probably) be reminded about at/after the specified time.
All times are UTC .
Reminders are checked every minute , so while precise times are not guaranteed , reminders should under normal conditions be received within 2 minutes of what you specify . """ )
2020-05-12 16:08:03 +00:00
async def remind ( ctx , time , * , reminder ) :
reminder = reminder . strip ( )
if len ( reminder ) > 512 :
await ctx . send ( embed = error_embed ( " Maximum reminder length is 512 characters " ) )
return
extra_data = {
" author_id " : ctx . author . id ,
" channel_id " : ctx . message . channel . id ,
" original_time_spec " : time
}
2020-06-18 18:43:20 +00:00
try :
time = util . parse_time ( time )
except :
await ctx . send ( embed = error_embed ( " Invalid time " ) )
return
2020-05-12 16:08:03 +00:00
await database . execute ( " INSERT INTO reminders (remind_timestamp, created_timestamp, reminder, expired, extra) VALUES (?, ?, ?, ?, ?) " ,
( time . timestamp ( ) , timestamp ( ) , reminder , 0 , json . dumps ( extra_data ) ) )
await database . commit ( )
await ctx . send ( f " Reminder scheduled for { util . format_time ( time ) } . " )
@tasks.loop ( seconds = 60 )
async def remind_worker ( ) :
csr = database . execute ( " SELECT * FROM reminders WHERE expired = 0 AND remind_timestamp < ? " , ( timestamp ( ) , ) )
to_expire = [ ]
async with csr as cursor :
async for row in cursor :
rid , remind_timestamp , created_timestamp , reminder_text , _ , extra = row
try :
remind_timestamp = datetime . utcfromtimestamp ( remind_timestamp )
created_timestamp = datetime . utcfromtimestamp ( created_timestamp )
extra = json . loads ( extra )
uid = extra [ " author_id " ]
text = f " <@ { uid } > Reminder queued at { util . format_time ( created_timestamp ) } : { reminder_text } "
channel = bot . get_channel ( extra [ " channel_id " ] )
await channel . send ( text )
to_expire . append ( rid )
except Exception as e :
logging . warning ( " Could not send reminder %d " , rid , exc_info = e )
for expiry_id in to_expire :
logging . info ( " Expiring reminder %d " , expiry_id )
await database . execute ( " UPDATE reminders SET expired = 1 WHERE id = ? " , ( expiry_id , ) )
await database . commit ( )
2020-06-18 18:43:20 +00:00
@bot.command ( help = " Get some information about the bot. " )
async def about ( ctx ) :
await ctx . send ( """ **AutoBotRobot: The least useful Discord bot ever designed.**
AutoBotRobot has many features , but not necessarily any practical ones .
It can execute code via TIO . run , do reminders , print fortunes , and not any more !
AutoBotRobot is open source - the code is available at < https : / / github . com / osmarks / autobotrobot > - and you could run your own instance if you wanted to and could get around the complete lack of user guide or documentation .
You can also invite it to your server : < https : / / discordapp . com / oauth2 / authorize ? & client_id = 509849474647064576 & scope = bot & permissions = 68608 >
""" )
@bot.group ( )
async def magic ( ctx ) :
if not await bot . is_owner ( ctx . author ) :
return await ctx . send ( embed = error_embed ( f " { ctx . author . name } is not in the sudoers file. This incident has been reported. " ) )
if ctx . invoked_subcommand == None :
return await ctx . send ( " Invalid magic command. " )
@magic.command ( rest_is_raw = True )
async def py ( ctx , * , code ) :
code = util . extract_codeblock ( code )
try :
loc = {
* * locals ( ) ,
" bot " : bot ,
" ctx " : ctx ,
}
result = await asyncio . wait_for ( util . async_exec ( code , loc , globals ( ) ) , timeout = 5.0 )
await ctx . send ( " ``` \n " + repr ( result ) . replace ( " ``` " , " \\ ` \\ ` \\ ` " ) [ : 1900 ] + " \n ``` " )
except TimeoutError :
await ctx . send ( embed = error_embed ( " Timed out. " ) )
except BaseException as e :
await ctx . send ( " Error: \n ``` \n " + traceback . format_exc ( ) . replace ( " ``` " , " \\ ` \\ ` \\ ` " ) [ : 1900 ] + " \n ``` " )
@magic.command ( rest_is_raw = True )
async def sql ( ctx , * , code ) :
code = util . extract_codeblock ( code )
csr = database . execute ( code )
out = " "
async with csr as cursor :
async for row in cursor :
out + = " ` " + " " . join ( map ( repr , row ) ) + " ` \n "
await ctx . send ( out [ : 1999 ] )
await database . commit ( )
2020-05-12 16:08:03 +00:00
@bot.event
async def on_ready ( ) :
logging . info ( " Connected as " + bot . user . name )
2020-06-18 18:43:20 +00:00
await bot . change_presence ( status = discord . Status . online , activity = discord . Activity ( type = discord . ActivityType . listening , name = f " commands beginning with { config [ ' prefix ' ] } " ) )
2020-05-12 16:08:03 +00:00
remind_worker . start ( )
2020-04-18 12:14:31 +00:00
async def run_bot ( ) :
global database
database = await db . init ( config [ " database " ] )
await bot . start ( config [ " token " ] )
if __name__ == ' __main__ ' :
loop = asyncio . get_event_loop ( )
try :
loop . run_until_complete ( run_bot ( ) )
except KeyboardInterrupt :
2020-05-12 16:08:03 +00:00
remind_worker . cancel ( )
2020-04-18 12:14:31 +00:00
loop . run_until_complete ( bot . logout ( ) )
finally :
2020-06-18 18:43:20 +00:00
loop . close ( )