135 lines
3.9 KiB
Python
135 lines
3.9 KiB
Python
# This software is licensed by the MIT License, see LICENSE file
|
|
# Copyright © 2024 Gregory Lirent
|
|
|
|
import time
|
|
import os
|
|
import yaml
|
|
from typing import List
|
|
from ipaddress import ip_network, IPv4Network, IPv4Address, IPv6Network, IPv6Address
|
|
from enum import Enum
|
|
|
|
|
|
class LogLevel(Enum):
|
|
critical = 'critical'
|
|
error = 'error'
|
|
warning = 'warning'
|
|
info = 'info'
|
|
debug = 'debug'
|
|
trace = 'trace'
|
|
|
|
def __str__(self):
|
|
return self.value
|
|
|
|
|
|
class Proxy:
|
|
|
|
@staticmethod
|
|
def __in_net(net: IPv4Network | IPv6Network, dest: IPv4Network | IPv6Network) -> bool:
|
|
return net.network_address in dest and net.broadcast_address in dest
|
|
|
|
@staticmethod
|
|
def __fetch_networks(net_list: List[str]) -> List[IPv4Network | IPv6Network]:
|
|
networks = []
|
|
if net_list:
|
|
for net in net_list:
|
|
networks.append(ip_network(net))
|
|
return networks
|
|
|
|
@property
|
|
def name(self):
|
|
return f'proxy{hash(self)}'
|
|
|
|
@property
|
|
def function_name(self):
|
|
return f'is{self.name.capitalize()}'
|
|
|
|
@property
|
|
def host(self):
|
|
return self.__host
|
|
|
|
@property
|
|
def port(self) -> int:
|
|
return self.__port
|
|
|
|
def is_allowed_from(self, net: str | IPv4Address | IPv6Address | IPv4Network | IPv6Network) -> bool:
|
|
if isinstance(net, (str, IPv4Address, IPv4Network)):
|
|
net = ip_network(str(net))
|
|
|
|
if self.__deny:
|
|
for dest in self.__deny:
|
|
if self.__in_net(net, dest):
|
|
return False
|
|
|
|
if not self.__allow:
|
|
return True
|
|
|
|
for dest in self.__allow:
|
|
if net.network_address in dest and net.broadcast_address in dest:
|
|
return True
|
|
return False
|
|
|
|
def render(self):
|
|
content = f'function is{self.name}(h) {{\n'
|
|
for target in self.__targets:
|
|
content += f'\tif (shExpMatch(h, "{target}")) return true;\n'
|
|
return f'{content}\treturn false;\n}}'
|
|
|
|
def __str__(self):
|
|
return f'PROXY {self.__host}:{self.__port}'
|
|
|
|
def __init__(self, *, host: str, port: int,
|
|
allow: List[str] | None = None,
|
|
deny: List[str] | None = None,
|
|
targets: List[str] | None = None):
|
|
|
|
self.__host = host
|
|
self.__port = port
|
|
self.__allow = self.__fetch_networks(allow)
|
|
self.__deny = self.__fetch_networks(deny)
|
|
self.__targets = targets if targets else []
|
|
|
|
if not self.__host or not self.__port:
|
|
raise ValueError("Proxy's host and port cannot be empty")
|
|
if not self.__targets:
|
|
raise ValueError("Pointless proxy")
|
|
|
|
|
|
class PACContent:
|
|
|
|
@property
|
|
def proxies(self) -> List[Proxy]:
|
|
if not os.path.exists(self.__filename):
|
|
return []
|
|
|
|
if os.stat(self.__filename).st_mtime >= self.__atime:
|
|
with open(self.__filename, "r") as fd:
|
|
conf: dict = yaml.safe_load(fd)
|
|
|
|
self.__atime = int(time.time())
|
|
self.__proxies = []
|
|
for proxy in conf.get('proxy_servers', []):
|
|
try:
|
|
self.__proxies.append(Proxy(**proxy))
|
|
except ValueError:
|
|
pass
|
|
|
|
return self.__proxies
|
|
|
|
def render_for(self, net: str | IPv4Address | IPv6Address | IPv6Network | IPv4Network):
|
|
f_blocks = []
|
|
p_blocks = ["function FindProxyForURL(url, host) {"]
|
|
|
|
for proxy in self.proxies:
|
|
if proxy.is_allowed_from(net):
|
|
f_blocks.append(proxy.render())
|
|
p_blocks.append(f'\tif ({proxy.function_name}(host) return "{str(proxy)}";')
|
|
p_blocks.append('\treturn "DIRECT";\n}')
|
|
f_blocks.append("\n".join(p_blocks))
|
|
|
|
return "\n\n".join(f_blocks)
|
|
|
|
def __init__(self, filename: str):
|
|
self.__filename = filename
|
|
self.__atime = 0
|
|
self.__proxies = []
|