mirror of
https://github.com/osmarks/random-stuff
synced 2024-11-09 13:59:55 +00:00
Captive portal script
This commit is contained in:
parent
37bc5f906e
commit
787ddbb5bb
10
.gitignore
vendored
10
.gitignore
vendored
@ -1,3 +1,11 @@
|
||||
out.wav
|
||||
*/target/*
|
||||
code-guessing/analytics/people
|
||||
code-guessing/analytics/people
|
||||
*.aux
|
||||
*.log
|
||||
*.blg
|
||||
*latexmk
|
||||
*.fls
|
||||
*.bbl
|
||||
*.out
|
||||
*.gz
|
@ -60,4 +60,5 @@ This comes with absolutely no guarantee of support or correct function, although
|
||||
* `screensaver.html` - an attempt to replicate one of Apple's screensavers (incomplete).
|
||||
* `StringGroup.hs` - native Haskell strings are only a monoid, so I made an improved version which forms a group instead.
|
||||
* `arithmetic_coder.py` - arithmetic coding implementation apart from the fact that it doesn't work properly (it doesn't write enough digits for some reason I haven't worked out).
|
||||
* `bmp280_prometheus.py` - read BMP280 temperature/pressure sensors and export as Prometheus metrics.
|
||||
* `bmp280_prometheus.py` - read BMP280 temperature/pressure sensors and export as Prometheus metrics.
|
||||
* `captive_portal.py` - automatically fill in captive portal forms (WIP).
|
163
captive_portal.py
Normal file
163
captive_portal.py
Normal file
@ -0,0 +1,163 @@
|
||||
import subprocess
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
import re
|
||||
from requests.adapters import HTTPAdapter
|
||||
from urllib3.util.retry import Retry
|
||||
import urllib.parse, urllib3
|
||||
from dns.resolver import Resolver
|
||||
import dns.resolver
|
||||
import string
|
||||
import random
|
||||
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
class CustomDNSAdapter(HTTPAdapter):
|
||||
def __init__(self, dns_server, *args, **kwargs):
|
||||
self.dns_server = dns_server
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def send(self, request, **kwargs):
|
||||
connection_pool_kwargs = self.poolmanager.connection_pool_kw
|
||||
|
||||
old_url = request.url
|
||||
parsed_url = urllib.parse.urlparse(request.url)
|
||||
hostname = parsed_url.hostname
|
||||
connection_pool_kwargs["server_hostname"] = hostname
|
||||
request.headers["Host"] = hostname
|
||||
|
||||
custom_resolver = Resolver()
|
||||
custom_resolver.nameservers = [self.dns_server]
|
||||
try:
|
||||
ip = custom_resolver.resolve(hostname).rrset[0].to_text()
|
||||
request.url = parsed_url._replace(netloc=ip).geturl()
|
||||
except dns.resolver.NXDOMAIN:
|
||||
pass
|
||||
|
||||
response = super().send(request, **kwargs)
|
||||
response.url = old_url
|
||||
return response
|
||||
|
||||
session = requests.Session()
|
||||
|
||||
DETECTPORTAL_URL = "http://detectportal.firefox.com/canonical.html"
|
||||
DETECTPORTAL_CONTENT = '<meta http-equiv="refresh" content="0;url=https://support.mozilla.org/kb/captive-portal"/>'
|
||||
PRIORITY_KEYWORDS = {"registr", "login", "signup", "signin"}
|
||||
EMAIL_BASE = "0t.lt"
|
||||
|
||||
FIELDTYPES = ("email", "postcode")
|
||||
|
||||
def is_priority(url):
|
||||
url = url.replace("-", "").replace("_", "")
|
||||
for keyword in PRIORITY_KEYWORDS:
|
||||
if keyword in url:
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_dns():
|
||||
constate = subprocess.check_output(["nmcli", "dev", "show"]).decode("utf-8")
|
||||
for line in constate.splitlines():
|
||||
name, val = line.split(":", 1)
|
||||
if name == "IP4.DNS[1]":
|
||||
return val.strip()
|
||||
|
||||
dns_server = get_dns()
|
||||
adapter = CustomDNSAdapter(dns_server)
|
||||
session.mount("http://", adapter)
|
||||
session.mount("https://", adapter)
|
||||
session.verify = False
|
||||
session.headers.update({"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36"})
|
||||
|
||||
tried = set()
|
||||
queue = []
|
||||
|
||||
def generate_email():
|
||||
return "".join(random.choices(string.ascii_lowercase + string.digits, k=10)) + "@" + EMAIL_BASE
|
||||
|
||||
def generate_generic():
|
||||
return "".join(random.choices(string.ascii_lowercase, k=10)).title()
|
||||
|
||||
def handle_response(response):
|
||||
tried.add(response.url)
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
queue_ext = []
|
||||
for link in soup.find_all("a"):
|
||||
if href := link.get("href"):
|
||||
href = urllib.parse.urljoin(response.url, href)
|
||||
if is_priority(href):
|
||||
queue_ext.insert(0, href)
|
||||
else:
|
||||
queue_ext.append(href)
|
||||
|
||||
for form in soup.find_all("form"):
|
||||
fields = {}
|
||||
|
||||
for input in form.find_all("input"):
|
||||
name = input.get("name", "")
|
||||
if not name: continue
|
||||
|
||||
fieldtype = None
|
||||
for ty in FIELDTYPES:
|
||||
if ty in name.lower():
|
||||
fieldtype = ty
|
||||
elif ty in input.get("id", "").lower():
|
||||
fieldtype = ty
|
||||
elif ty in input.get("placeholder", "").lower().replace(" ", ""):
|
||||
fieldtype = ty
|
||||
|
||||
match fieldtype:
|
||||
case "email":
|
||||
fields[name] = generate_email()
|
||||
case "postcode":
|
||||
fields[name] = "W1A 1AA" # ISO standard postcode (real)
|
||||
case None:
|
||||
fields[name] = generate_generic()
|
||||
|
||||
if input.get("type") == "checkbox":
|
||||
fields[name] = input.get("value", "on")
|
||||
if input.get("type") == "hidden":
|
||||
fields[name] = input.get("value")
|
||||
|
||||
for select in form.find_all("select"):
|
||||
name = select.get("name", "")
|
||||
if not name: continue
|
||||
|
||||
for option in select.find_all("option"):
|
||||
if option.get("disabled") is not None: continue
|
||||
fields[name] = option.get("value", generate_generic())
|
||||
else:
|
||||
fields[name] = generate_generic()
|
||||
continue
|
||||
|
||||
action = urllib.parse.urljoin(response.url, form.get("action", ""))
|
||||
if form.get("method") == "get":
|
||||
response = session.get(action, params=fields)
|
||||
else:
|
||||
response = session.post(action, data=fields)
|
||||
handle_response(response)
|
||||
|
||||
queue.extend(x for x in queue_ext if x not in tried)
|
||||
|
||||
while True:
|
||||
response = session.get(DETECTPORTAL_URL)
|
||||
if response.text == DETECTPORTAL_CONTENT:
|
||||
print("OK")
|
||||
raise SystemExit(0)
|
||||
handle_response(response)
|
||||
try:
|
||||
next_url = queue.pop(0)
|
||||
except IndexError:
|
||||
print("No more URLs to try")
|
||||
break
|
||||
try:
|
||||
response = session.get(next_url)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
continue
|
||||
handle_response(response)
|
||||
if len(tried) > 100:
|
||||
print("Tries exceeded")
|
||||
break
|
||||
|
||||
print("KO")
|
||||
raise SystemExit(1)
|
Loading…
Reference in New Issue
Block a user