import aiohttp import discord import asyncio import logging import discord.ext.commands as commands import html.parser import collections import util import io import concurrent.futures class Parser(html.parser.HTMLParser): def __init__(self): self.links = [] super().__init__() def handle_starttag(self, tag, attrs): attrs = dict(attrs) if tag == "a" and attrs.get("class") == "result__a" and "https://duckduckgo.com/y.js?ad_provider" not in attrs["href"]: self.links.append(attrs["href"]) class Search(commands.Cog): def __init__(self, bot): self.bot = bot self.session = aiohttp.ClientSession() self.wp_cache = collections.OrderedDict() self.wp_search_cache = collections.OrderedDict() self.pool = None @commands.command() async def search(self, ctx, *, query): "Search using DuckDuckGo. Returns the first result as a link." async with ctx.typing(): async with self.session.post("https://html.duckduckgo.com/html/", data={ "q": query, "d": "" }) as resp: if resp.history: await ctx.send(resp.url, reference=ctx.message) else: p = Parser() txt = await resp.text() p.feed(txt) p.close() try: return await ctx.send(p.links[0], reference=ctx.message) except IndexError: return await ctx.send("No results.", reference=ctx.message) async def wp_search(self, query): async with self.session.get("https://en.wikipedia.org/w/api.php", params={ "action": "query", "list": "search", "srsearch": query, "utf8": "1", "format": "json", "srlimit": 1 }) as resp: data = (await resp.json())["query"]["search"] if len(data) > 0: return data[0]["title"] else: return None async def wp_fetch(self, page, *, fallback=True): async def fallback_to_search(): if fallback: new_page = await self.wp_search(page) if len(self.wp_search_cache) > util.config["ir"]["cache_size"]: self.wp_search_cache.popitem(last=False) self.wp_search_cache[page] = new_page if new_page is None: return None return await self.wp_fetch(new_page, fallback=False) if page in self.wp_cache: return self.wp_cache[page] if page in self.wp_search_cache: if self.wp_search_cache[page] is None: return None return await self.wp_fetch(self.wp_search_cache[page], fallback=False) async with self.session.get("https://en.wikipedia.org/w/api.php", params={ "action": "query", "format": "json", "titles": page, "prop": "extracts", "exintro": 1, "explaintext": 1 }) as resp: data = (await resp.json())["query"] if "-1" in data["pages"]: return await fallback_to_search() else: content = next(iter(data["pages"].values()))["extract"] if not content: return await fallback_to_search() if len(self.wp_cache) > util.config["ir"]["cache_size"]: self.wp_cache.popitem(last=False) self.wp_cache[page] = content return content @commands.command(aliases=["wp"]) async def wikipedia(self, ctx, *, page): "Have you ever wanted the first section of a Wikipedia page? Obviously, yes. This gets that." content = await self.wp_fetch(page) if content is None: await ctx.send("Not found.") else: f = io.BytesIO(content.encode("utf-8")) file = discord.File(f, "content.txt") await ctx.send(file=file) def cog_unload(self): asyncio.create_task(self.session.close()) if self.pool is not None: self.pool.shutdown() def setup(bot): cog = Search(bot) bot.add_cog(cog)