1
0
mirror of https://github.com/osmarks/meme-search-engine.git synced 2024-11-14 07:44:49 +00:00
meme-search-engine/meme-rater/crawler.py
osmarks bd426a30ba Port meme acquisition pipeline to new API, database
Also fix a really stupid oversight in crawling code.
2024-05-22 15:43:56 +01:00

136 lines
4.9 KiB
Python

import aiohttp
import asyncio
import aiofiles
import os.path
import hashlib
import json
import time
import sys
async def fetch_list_seg(sess, list_url, query):
async with sess.get(list_url + ".json", params=query) as res:
return await res.json()
async def fetch_past(sess, list_url, n):
after = None
count = 0
while count < n:
args = { "count": 25 }
if after is not None: args["after"] = after
chunk = await fetch_list_seg(sess, list_url, args)
if "data" not in chunk:
print("\n", chunk)
await asyncio.sleep(400)
continue
new_items = chunk["data"]["children"]
yield [ i["data"] for i in new_items ]
count += len(new_items)
print("\nup to", count)
after = new_items[-1]["data"]["name"]
SEEN_ITEMS_SIZE = 200
async def fetch_stream(sess, list_url):
# dicts are ordered, so this is a very janky ordered set implementation
seen = {}
while True:
list_items = (await fetch_list_seg(sess, list_url, {}))["data"]["children"]
new = [ i["data"] for i in list_items if i["data"]["name"] not in seen ]
# yield the new items
for n in new: yield n
# add new items to list of seen things
seen.update(dict.fromkeys(i["name"] for i in new))
# remove old seen items until it's a reasonable size
while len(seen) > SEEN_ITEMS_SIZE: seen.pop(next(iter(seen.keys())))
# compute average time between posts and wait that long for next fetch cycle
times = [ i["data"]["created"] for i in list_items ]
timediffs = list(map(lambda x: x[0] - x[1], zip(times, times[1:])))
average = sum(timediffs) / len(timediffs)
await asyncio.sleep(average)
def bucket(id): return hashlib.md5(id.encode("utf-8")).hexdigest()[:2]
filetypes = {
"image/png": "png",
"image/jpeg": "jpg",
"image/webp": "webp",
"image/avif": "avif"
}
hard_exclude = {
".mp4",
".mkv",
".webm"
}
CHUNK_SIZE = 1<<18 # entirely arbitrary
async def download(sess, url, file):
async with sess.get(url) as res:
ctype = res.headers.get("content-type")
if ctype not in filetypes: return
if int(res.headers.get("content-length", 1e9)) > 8e6: return
async with aiofiles.open(file + "." + filetypes[ctype], mode="wb") as fh:
while chunk := await res.content.read(CHUNK_SIZE):
await fh.write(chunk)
return dict(res.headers)
async def main(time_threshold):
sem = asyncio.Semaphore(16)
async with aiohttp.ClientSession() as sess:
async def download_item(item):
#print("starting on", item["name"])
print(".", end="")
sys.stdout.flush()
if item["over_18"] or not item["is_robot_indexable"]: return
id = item["name"]
bck = bucket(id)
os.makedirs(os.path.join("images", bck), exist_ok=True)
os.makedirs(os.path.join("meta", bck), exist_ok=True)
if not item.get("preview"): return
if not item["url"].startswith("https://"): return
meta_path = os.path.join("meta", bck, id + ".json")
if not os.path.exists(meta_path): # sorry
print("|", end="")
sys.stdout.flush()
excluded = False
for excl in hard_exclude:
if item["url"].endswith(excl):
excluded = True
break
try:
if not excluded:
result = await download(sess, item["url"], os.path.join("images", bck, id))
except Exception as e:
print("\nMeme acquisition failure:", e)
return
if result:
item["headers"] = result
with open(meta_path, "w") as fh:
json.dump(item, fh)
else:
print("!", end="")
sys.stdout.flush()
#print("done on", id)
async def dl_task(item):
async with sem:
try:
await asyncio.wait_for(download_item(item), timeout=30)
except asyncio.TimeoutError: pass
async for items in fetch_past(sess, "https://www.reddit.com/user/osmarks/m/memeharvesting/new", 20000):
#print("got new chunk")
await sem.acquire()
sem.release()
#print("downloading new set")
async with asyncio.TaskGroup() as tg:
for item in items:
if time_threshold and time_threshold > item["created"]:
return
tg.create_task(dl_task(item))
if __name__ == "__main__":
threshold = None
if len(sys.argv) > 1:
print("thresholding at", sys.argv[1])
threshold = float(sys.argv[1])
asyncio.run(main(threshold))