PACServer/app/models.py

135 lines
3.9 KiB
Python
Raw Permalink Normal View History

2024-05-29 23:27:43 +03:00
# This software is licensed by the MIT License, see LICENSE file
# Copyright © 2024 Gregory Lirent
import time
import os
import yaml
from typing import List
from ipaddress import ip_network, IPv4Network, IPv4Address, IPv6Network, IPv6Address
from enum import Enum
class LogLevel(Enum):
critical = 'critical'
error = 'error'
warning = 'warning'
info = 'info'
debug = 'debug'
trace = 'trace'
def __str__(self):
return self.value
class Proxy:
@staticmethod
def __in_net(net: IPv4Network | IPv6Network, dest: IPv4Network | IPv6Network) -> bool:
return net.network_address in dest and net.broadcast_address in dest
@staticmethod
def __fetch_networks(net_list: List[str]) -> List[IPv4Network | IPv6Network]:
networks = []
if net_list:
for net in net_list:
networks.append(ip_network(net))
return networks
@property
def name(self):
return f'proxy{hash(self)}'
@property
def function_name(self):
return f'is{self.name.capitalize()}'
@property
def host(self):
return self.__host
@property
def port(self) -> int:
return self.__port
def is_allowed_from(self, net: str | IPv4Address | IPv6Address | IPv4Network | IPv6Network) -> bool:
if isinstance(net, (str, IPv4Address, IPv4Network)):
net = ip_network(str(net))
if self.__deny:
for dest in self.__deny:
if self.__in_net(net, dest):
return False
if not self.__allow:
return True
for dest in self.__allow:
if net.network_address in dest and net.broadcast_address in dest:
return True
return False
def render(self):
2024-05-30 01:44:50 +03:00
content = f'function {self.function_name}(h) {{\n'
2024-05-29 23:27:43 +03:00
for target in self.__targets:
content += f'\tif (shExpMatch(h, "{target}")) return true;\n'
return f'{content}\treturn false;\n}}'
def __str__(self):
return f'PROXY {self.__host}:{self.__port}'
def __init__(self, *, host: str, port: int,
allow: List[str] | None = None,
deny: List[str] | None = None,
targets: List[str] | None = None):
self.__host = host
self.__port = port
self.__allow = self.__fetch_networks(allow)
self.__deny = self.__fetch_networks(deny)
self.__targets = targets if targets else []
if not self.__host or not self.__port:
raise ValueError("Proxy's host and port cannot be empty")
if not self.__targets:
raise ValueError("Pointless proxy")
class PACContent:
@property
def proxies(self) -> List[Proxy]:
if not os.path.exists(self.__filename):
return []
if os.stat(self.__filename).st_mtime >= self.__atime:
with open(self.__filename, "r") as fd:
conf: dict = yaml.safe_load(fd)
self.__atime = int(time.time())
self.__proxies = []
for proxy in conf.get('proxy_servers', []):
try:
self.__proxies.append(Proxy(**proxy))
except ValueError:
pass
return self.__proxies
def render_for(self, net: str | IPv4Address | IPv6Address | IPv6Network | IPv4Network):
f_blocks = []
p_blocks = ["function FindProxyForURL(url, host) {"]
for proxy in self.proxies:
if proxy.is_allowed_from(net):
f_blocks.append(proxy.render())
2024-05-30 01:34:23 +03:00
p_blocks.append(f'\tif ({proxy.function_name}(host)) return "{str(proxy)}";')
2024-05-29 23:27:43 +03:00
p_blocks.append('\treturn "DIRECT";\n}')
f_blocks.append("\n".join(p_blocks))
return "\n\n".join(f_blocks)
def __init__(self, filename: str):
self.__filename = filename
self.__atime = 0
self.__proxies = []