# This software is licensed by the MIT License, see LICENSE file # Copyright © 2024 Gregory Lirent import time import os import yaml from typing import List from ipaddress import ip_network, IPv4Network, IPv4Address, IPv6Network, IPv6Address from enum import Enum class LogLevel(Enum): critical = 'critical' error = 'error' warning = 'warning' info = 'info' debug = 'debug' trace = 'trace' def __str__(self): return self.value class Proxy: @staticmethod def __in_net(net: IPv4Network | IPv6Network, dest: IPv4Network | IPv6Network) -> bool: return net.network_address in dest and net.broadcast_address in dest @staticmethod def __fetch_networks(net_list: List[str]) -> List[IPv4Network | IPv6Network]: networks = [] if net_list: for net in net_list: networks.append(ip_network(net)) return networks @property def name(self): return f'proxy{hash(self)}' @property def function_name(self): return f'is{self.name.capitalize()}' @property def host(self): return self.__host @property def port(self) -> int: return self.__port def is_allowed_from(self, net: str | IPv4Address | IPv6Address | IPv4Network | IPv6Network) -> bool: if isinstance(net, (str, IPv4Address, IPv4Network)): net = ip_network(str(net)) if self.__deny: for dest in self.__deny: if self.__in_net(net, dest): return False if not self.__allow: return True for dest in self.__allow: if net.network_address in dest and net.broadcast_address in dest: return True return False def render(self): content = f'function {self.function_name}(h) {{\n' for target in self.__targets: content += f'\tif (shExpMatch(h, "{target}")) return true;\n' return f'{content}\treturn false;\n}}' def __str__(self): return f'PROXY {self.__host}:{self.__port}' def __init__(self, *, host: str, port: int, allow: List[str] | None = None, deny: List[str] | None = None, targets: List[str] | None = None): self.__host = host self.__port = port self.__allow = self.__fetch_networks(allow) self.__deny = self.__fetch_networks(deny) self.__targets = targets if targets else [] if not self.__host or not self.__port: raise ValueError("Proxy's host and port cannot be empty") if not self.__targets: raise ValueError("Pointless proxy") class PACContent: @property def proxies(self) -> List[Proxy]: if not os.path.exists(self.__filename): return [] if os.stat(self.__filename).st_mtime >= self.__atime: with open(self.__filename, "r") as fd: conf: dict = yaml.safe_load(fd) self.__atime = int(time.time()) self.__proxies = [] for proxy in conf.get('proxy_servers', []): try: self.__proxies.append(Proxy(**proxy)) except ValueError: pass return self.__proxies def render_for(self, net: str | IPv4Address | IPv6Address | IPv6Network | IPv4Network): f_blocks = [] p_blocks = ["function FindProxyForURL(url, host) {"] for proxy in self.proxies: if proxy.is_allowed_from(net): f_blocks.append(proxy.render()) p_blocks.append(f'\tif ({proxy.function_name}(host)) return "{str(proxy)}";') p_blocks.append('\treturn "DIRECT";\n}') f_blocks.append("\n".join(p_blocks)) return "\n\n".join(f_blocks) def __init__(self, filename: str): self.__filename = filename self.__atime = 0 self.__proxies = []