autobotrobot/src/search.py

99 lines
3.9 KiB
Python
Raw Permalink Normal View History

import aiohttp
import discord
import asyncio
import logging
import discord.ext.commands as commands
import html.parser
import collections
import util
import io
import concurrent.futures
class Parser(html.parser.HTMLParser):
def __init__(self):
self.links = []
super().__init__()
def handle_starttag(self, tag, attrs):
attrs = dict(attrs)
if tag == "a" and attrs.get("class") == "result__a" and "https://duckduckgo.com/y.js?ad_provider" not in attrs["href"]:
self.links.append(attrs["href"])
class Search(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.session = aiohttp.ClientSession()
self.wp_cache = collections.OrderedDict()
self.wp_search_cache = collections.OrderedDict()
self.pool = None
@commands.command()
async def search(self, ctx, *, query):
"Search using DuckDuckGo. Returns the first result as a link."
async with ctx.typing():
async with self.session.post("https://html.duckduckgo.com/html/", data={ "q": query, "d": "" }) as resp:
if resp.history:
await ctx.send(resp.url, reference=ctx.message)
else:
p = Parser()
txt = await resp.text()
p.feed(txt)
p.close()
try:
return await ctx.send(p.links[0], reference=ctx.message)
except IndexError:
return await ctx.send("No results.", reference=ctx.message)
async def wp_search(self, query):
async with self.session.get("https://en.wikipedia.org/w/api.php",
params={ "action": "query", "list": "search", "srsearch": query, "utf8": "1", "format": "json", "srlimit": 1 }) as resp:
data = (await resp.json())["query"]["search"]
if len(data) > 0: return data[0]["title"]
else: return None
async def wp_fetch(self, page, *, fallback=True):
async def fallback_to_search():
if fallback:
new_page = await self.wp_search(page)
if len(self.wp_search_cache) > util.config["ir"]["cache_size"]:
self.wp_search_cache.popitem(last=False)
self.wp_search_cache[page] = new_page
if new_page is None: return None
return await self.wp_fetch(new_page, fallback=False)
if page in self.wp_cache: return self.wp_cache[page]
if page in self.wp_search_cache:
if self.wp_search_cache[page] is None: return None
return await self.wp_fetch(self.wp_search_cache[page], fallback=False)
async with self.session.get("https://en.wikipedia.org/w/api.php",
params={ "action": "query", "format": "json", "titles": page, "prop": "extracts", "exintro": 1, "explaintext": 1 }) as resp:
data = (await resp.json())["query"]
if "-1" in data["pages"]:
return await fallback_to_search()
else:
content = next(iter(data["pages"].values()))["extract"]
if not content: return await fallback_to_search()
if len(self.wp_cache) > util.config["ir"]["cache_size"]:
self.wp_cache.popitem(last=False)
self.wp_cache[page] = content
return content
@commands.command(aliases=["wp"])
async def wikipedia(self, ctx, *, page):
"Have you ever wanted the first section of a Wikipedia page? Obviously, yes. This gets that."
content = await self.wp_fetch(page)
if content is None:
await ctx.send("Not found.")
else:
f = io.BytesIO(content.encode("utf-8"))
file = discord.File(f, "content.txt")
await ctx.send(file=file)
def cog_unload(self):
asyncio.create_task(self.session.close())
if self.pool is not None:
self.pool.shutdown()
def setup(bot):
cog = Search(bot)
bot.add_cog(cog)