2021-02-25 17:48:06 +00:00
import asyncio
import prometheus_client
import dataclasses
import typing
import collections
import logging
2021-07-28 19:30:37 +00:00
import discord
2021-02-25 17:48:06 +00:00
import util
@dataclasses.dataclass
class AuthorInfo :
name : str
id : any
avatar_url : str = None
deprioritize : bool = False
def unpack_dataclass_without ( d , without ) :
dct = dict ( [ ( field , getattr ( d , field ) ) for field in type ( d ) . __dataclass_fields__ ] )
del dct [ without ]
return dct
@dataclasses.dataclass
class Message :
author : AuthorInfo
2021-03-25 17:56:29 +00:00
message : list [ typing . Union [ str , dict ] ]
2021-02-25 17:48:06 +00:00
source : ( str , any )
id : int
2021-07-28 19:30:37 +00:00
attachments : list [ discord . Attachment ]
2021-10-28 11:55:40 +00:00
reply : ( AuthorInfo , str ) = None
2021-02-25 17:48:06 +00:00
evbus_messages = prometheus_client . Counter ( " abr_evbus_messages " , " Messages processed by event bus " , [ " source_type " ] )
evbus_messages_dropped = prometheus_client . Counter ( " abr_evbus_messages_dropped " , " Messages received by event bus but dropped by rate limits " , [ " source_type " ] )
# maps each bridge destination type (discord/APIONET/etc) to the listeners for it
listeners = collections . defaultdict ( set )
# maintains a list of all the unidirectional links between channels - key is source, values are targets
links = collections . defaultdict ( set )
def find_all_destinations ( source ) :
visited = set ( )
targets = set ( links [ source ] )
while len ( targets ) > 0 :
current = targets . pop ( )
targets . update ( adjacent for adjacent in links [ current ] if not adjacent in visited )
visited . add ( current )
return visited
2021-07-17 21:09:17 +00:00
# 5 messages per 5 seconds from each input channel
2021-02-25 17:48:06 +00:00
RATE = 10.0
PER = 5000000.0 # µs
RLData = collections . namedtuple ( " RLData " , [ " allowance " , " last_check " ] )
rate_limiting = collections . defaultdict ( lambda : RLData ( RATE , util . timestamp ( ) ) )
async def push ( msg : Message ) :
destinations = find_all_destinations ( msg . source )
if len ( destinations ) > 0 :
# "token bucket" rate limiting algorithm - max 10 messages per 5 seconds (half that for bots)
# TODO: maybe separate buckets for bot and unbot?
current = util . timestamp_µs ( )
time_passed = current - rate_limiting [ msg . source ] . last_check
allowance = rate_limiting [ msg . source ] . allowance
allowance + = time_passed * ( RATE / PER )
if allowance > RATE :
allowance = RATE
rate_limiting [ msg . source ] = RLData ( allowance , current )
if allowance < 1 :
evbus_messages_dropped . labels ( msg . source [ 0 ] ) . inc ( )
return
allowance - = 2.0 if msg . author . deprioritize else 1.0
rate_limiting [ msg . source ] = RLData ( allowance , current )
evbus_messages . labels ( msg . source [ 0 ] ) . inc ( )
for dest in destinations :
if dest == msg . source : continue
dest_type , dest_channel = dest
for listener in listeners [ dest_type ] :
asyncio . ensure_future ( listener ( dest_channel , msg ) )
2021-03-25 17:56:29 +00:00
def add_listener ( s , l ) :
listeners [ s ] . add ( l )
return lambda : listeners [ s ] . remove ( l )
2021-02-25 17:48:06 +00:00
2021-04-05 18:08:37 +00:00
async def add_bridge_link ( db , c1 , c2 , cause = None , bidirectional = True ) :
logging . info ( " Bridging %s and %s (bidirectional: %s ) " , repr ( c1 ) , repr ( c2 ) , bidirectional )
2021-02-25 17:48:06 +00:00
links [ c1 ] . add ( c2 )
2021-04-05 18:08:37 +00:00
if bidirectional : links [ c2 ] . add ( c1 )
await db . execute ( " INSERT INTO links VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING " , ( c1 [ 0 ] , c1 [ 1 ] , c2 [ 0 ] , c2 [ 1 ] , util . timestamp ( ) , cause ) )
if bidirectional : await db . execute ( " INSERT INTO links VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING " , ( c2 [ 0 ] , c2 [ 1 ] , c1 [ 0 ] , c1 [ 1 ] , util . timestamp ( ) , cause ) )
2021-02-25 17:48:06 +00:00
await db . commit ( )
2021-04-05 18:08:37 +00:00
async def remove_bridge_link ( db , c1 , c2 , bidirectional = True ) :
logging . info ( " Unbridging %s and %s (bidirectional: %s ) " , repr ( c1 ) , repr ( c2 ) , bidirectional )
2021-02-25 17:48:06 +00:00
links [ c1 ] . remove ( c2 )
2021-04-05 18:08:37 +00:00
if bidirectional : links [ c2 ] . remove ( c1 )
2021-02-25 17:48:06 +00:00
await db . execute ( " DELETE FROM links WHERE (to_type = ? AND to_id = ?) AND (from_type = ? AND from_id = ?) " , ( c1 [ 0 ] , c1 [ 1 ] , c2 [ 0 ] , c2 [ 1 ] ) )
2021-04-05 18:08:37 +00:00
if bidirectional : await db . execute ( " DELETE FROM links WHERE (to_type = ? AND to_id = ?) AND (from_type = ? AND from_id = ?) " , ( c2 [ 0 ] , c2 [ 1 ] , c1 [ 0 ] , c1 [ 1 ] ) )
2021-02-25 17:48:06 +00:00
await db . commit ( )
async def initial_load ( db ) :
rows = await db . execute_fetchall ( " SELECT * FROM links " )
for row in rows :
links [ ( row [ " from_type " ] , row [ " from_id " ] ) ] . add ( ( row [ " to_type " ] , row [ " to_id " ] ) )
logging . info ( " Loaded %d links " , len ( rows ) )