1
0
mirror of https://github.com/janeczku/calibre-web synced 2025-01-04 14:30:31 +00:00
calibre-web/cps/cw_advocate/addrvalidator.py

282 lines
11 KiB
Python

#
# Copyright 2015 Jordan Milne
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Source: https://github.com/JordanMilne/Advocate
import functools
import fnmatch
import ipaddress
import re
try:
import netifaces
HAVE_NETIFACES = True
except ImportError:
netifaces = None
HAVE_NETIFACES = False
from .exceptions import NameserverException, ConfigException
def canonicalize_hostname(hostname):
"""Lowercase and punycodify a hostname"""
# We do the lowercasing after IDNA encoding because we only want to
# lowercase the *ASCII* chars.
# TODO: The differences between IDNA2003 and IDNA2008 might be relevant
# to us, but both specs are damn confusing.
return str(hostname.encode("idna").lower(), 'utf-8')
def determine_local_addresses():
"""Get all IPs that refer to this machine according to netifaces"""
if not HAVE_NETIFACES:
raise ConfigException("Tried to determine local addresses, "
"but netifaces module was not importable")
ips = []
for interface in netifaces.interfaces():
if_families = netifaces.ifaddresses(interface)
for family_kind in {netifaces.AF_INET, netifaces.AF_INET6}:
addrs = if_families.get(family_kind, [])
for addr in (x.get("addr", "") for x in addrs):
if family_kind == netifaces.AF_INET6:
# We can't do anything sensible with the scope here
addr = addr.split("%")[0]
ips.append(ipaddress.ip_network(addr))
return ips
def add_local_address_arg(func):
"""Add the "_local_addresses" kwarg if it's missing
IMO this information shouldn't be cached between calls (what if one of the
adapters got a new IP at runtime?,) and we don't want each function to
recalculate it. Just recalculate it if the caller didn't provide it for us.
"""
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
if "_local_addresses" not in kwargs:
if self.autodetect_local_addresses:
kwargs["_local_addresses"] = determine_local_addresses()
else:
kwargs["_local_addresses"] = []
return func(self, *args, **kwargs)
return wrapper
class AddrValidator:
_6TO4_RELAY_NET = ipaddress.ip_network("192.88.99.0/24")
# Just the well known prefix, DNS64 servers can set their own
# prefix, but in practice most probably don't.
_DNS64_WK_PREFIX = ipaddress.ip_network("64:ff9b::/96")
DEFAULT_PORT_WHITELIST = {80, 8080, 443, 8443, 8000}
def __init__(
self,
ip_blacklist=None,
ip_whitelist=None,
port_whitelist=None,
port_blacklist=None,
hostname_blacklist=None,
allow_ipv6=False,
allow_teredo=False,
allow_6to4=False,
allow_dns64=False,
# Must be explicitly set to "False" if you don't want to try
# detecting local interface addresses with netifaces.
autodetect_local_addresses=True,
):
if not port_blacklist and not port_whitelist:
# An assortment of common HTTPS? ports.
port_whitelist = self.DEFAULT_PORT_WHITELIST.copy()
self.ip_blacklist = ip_blacklist or set()
self.ip_whitelist = ip_whitelist or set()
self.port_blacklist = port_blacklist or set()
self.port_whitelist = port_whitelist or set()
# TODO: ATM this can contain either regexes or globs that are converted
# to regexes upon every check. Create a collection that automagically
# converts them to regexes on insert?
self.hostname_blacklist = hostname_blacklist or set()
self.allow_ipv6 = allow_ipv6
self.allow_teredo = allow_teredo
self.allow_6to4 = allow_6to4
self.allow_dns64 = allow_dns64
self.autodetect_local_addresses = autodetect_local_addresses
@add_local_address_arg
def is_ip_allowed(self, addr_ip, _local_addresses=None):
if not isinstance(addr_ip,
(ipaddress.IPv4Address, ipaddress.IPv6Address)):
addr_ip = ipaddress.ip_address(addr_ip)
# The whitelist should take precedence over the blacklist so we can
# punch holes in blacklisted ranges
if any(addr_ip in net for net in self.ip_whitelist):
return True
if any(addr_ip in net for net in self.ip_blacklist):
return False
if any(addr_ip in net for net in _local_addresses):
return False
if addr_ip.version == 4:
if not addr_ip.is_private:
# IPs for carrier-grade NAT. Seems weird that it doesn't set
# `is_private`, but we need to check `not is_global`
if not ipaddress.ip_network(addr_ip).is_global:
return False
elif addr_ip.version == 6:
# You'd better have a good reason for enabling IPv6
# because Advocate's techniques don't work well without NAT.
if not self.allow_ipv6:
return False
# v6 addresses can also map to IPv4 addresses! Tricky!
v4_nested = []
if addr_ip.ipv4_mapped:
v4_nested.append(addr_ip.ipv4_mapped)
# WTF IPv6? Why you gotta have a billion tunneling mechanisms?
# XXX: Do we even really care about these? If we're tunneling
# through public servers we shouldn't be able to access
# addresses on our private network, right?
if addr_ip.sixtofour:
if not self.allow_6to4:
return False
v4_nested.append(addr_ip.sixtofour)
if addr_ip.teredo:
if not self.allow_teredo:
return False
# Check both the client *and* server IPs
v4_nested.extend(addr_ip.teredo)
if addr_ip in self._DNS64_WK_PREFIX:
if not self.allow_dns64:
return False
# When using the well-known prefix the last 4 bytes
# are the IPv4 addr
v4_nested.append(ipaddress.ip_address(addr_ip.packed[-4:]))
if not all(self.is_ip_allowed(addr_v4) for addr_v4 in v4_nested):
return False
# fec0::*, apparently deprecated?
if addr_ip.is_site_local:
return False
else:
raise ValueError("Unsupported IP version(?): %r" % addr_ip)
# 169.254.XXX.XXX, AWS uses these for autoconfiguration
if addr_ip.is_link_local:
return False
# 127.0.0.1, ::1, etc.
if addr_ip.is_loopback:
return False
if addr_ip.is_multicast:
return False
# 192.168.XXX.XXX, 10.XXX.XXX.XXX
if addr_ip.is_private:
return False
# 255.255.255.255, ::ffff:XXXX:XXXX (v6->v4) mapping
if addr_ip.is_reserved:
return False
# There's no reason to connect directly to a 6to4 relay
if addr_ip in self._6TO4_RELAY_NET:
return False
# 0.0.0.0
if addr_ip.is_unspecified:
return False
# It doesn't look bad, so... it's must be ok!
return True
def _hostname_matches_pattern(self, hostname, pattern):
# If they specified a string, just assume they only want basic globbing.
# This stops people from not realizing they're dealing in REs and
# not escaping their periods unless they specifically pass in an RE.
# This has the added benefit of letting us sanely handle globbed
# IDNs by default.
if isinstance(pattern, str):
# convert the glob to a punycode glob, then a regex
pattern = fnmatch.translate(canonicalize_hostname(pattern))
hostname = canonicalize_hostname(hostname)
# Down the line the hostname may get treated as a null-terminated string
# (as with `socket.getaddrinfo`.) Try to account for that.
#
# >>> socket.getaddrinfo("example.com\x00aaaa", 80)
# [(2, 1, 6, '', ('93.184.216.34', 80)), [...]
no_null_hostname = hostname.split("\x00")[0]
return any(re.match(pattern, x.strip(".")) for x
in (no_null_hostname, hostname))
def is_hostname_allowed(self, hostname):
# Sometimes (like with "external" services that your IP has privileged
# access to) you might not always know the IP range to blacklist access
# to, or the `A` record might change without you noticing.
# For e.x.: `foocorp.external.org`.
#
# Another option is doing something like:
#
# for addrinfo in socket.getaddrinfo("foocorp.external.org", 80):
# global_validator.ip_blacklist.add(ip_address(addrinfo[4][0]))
#
# but that's not always a good idea if they're behind a third-party lb.
for pattern in self.hostname_blacklist:
if self._hostname_matches_pattern(hostname, pattern):
return False
return True
@add_local_address_arg
def is_addrinfo_allowed(self, addrinfo, _local_addresses=None):
assert(len(addrinfo) == 5)
# XXX: Do we care about any of the other elements? Guessing not.
family, socktype, proto, canonname, sockaddr = addrinfo
# The 4th elem inaddrinfo may either be a touple of two or four items,
# depending on whether we're dealing with IPv4 or v6
if len(sockaddr) == 2:
# v4
ip, port = sockaddr
elif len(sockaddr) == 4:
# v6
# XXX: what *are* `flow_info` and `scope_id`? Anything useful?
# Seems like we can figure out all we need about the scope from
# the `is_<x>` properties.
ip, port, flow_info, scope_id = sockaddr
else:
raise ValueError("Unexpected addrinfo format %r" % sockaddr)
# Probably won't help protect against SSRF, but might prevent our being
# used to attack others' non-HTTP services. See
# http://www.remote.org/jochen/sec/hfpa/
if self.port_whitelist and port not in self.port_whitelist:
return False
if port in self.port_blacklist:
return False
if self.hostname_blacklist:
if not canonname:
raise NameserverException(
"addrinfo must contain the canon name to do blacklisting "
"based on hostname. Make sure you use the "
"`socket.AI_CANONNAME` flag, and that each record contains "
"the canon name. Your DNS server might also be garbage."
)
if not self.is_hostname_allowed(canonname):
return False
return self.is_ip_allowed(ip, _local_addresses=_local_addresses)