v0.0.1
This commit is contained in:
@@ -0,0 +1,29 @@
|
||||
# This software is licensed by the MIT License, see LICENSE file
|
||||
# Copyright © 2024 Gregory Lirent
|
||||
|
||||
from fastapi import FastAPI, Request, Response, HTTPException
|
||||
from .models import PACContent
|
||||
|
||||
pac: PACContent | None = None
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
def init(filename: str | None = None):
|
||||
global pac
|
||||
|
||||
if not filename:
|
||||
pac = None
|
||||
else:
|
||||
pac = PACContent(filename)
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def root(request: Request, view: bool = False):
|
||||
if not pac:
|
||||
raise HTTPException(status_code=500, detail="Config is not set")
|
||||
|
||||
print(request.headers.get('X-Real-IP', None) or request.client.host)
|
||||
|
||||
return Response(status_code=200, media_type="application/javascript",
|
||||
content=pac.render_for(request.headers.get('X-Real-IP', None) or request.client.host),
|
||||
headers=None if view else {"Content-Disposition": "attachment; filename=wpad.dat"})
|
||||
+137
@@ -0,0 +1,137 @@
|
||||
# This software is licensed by the MIT License, see LICENSE file
|
||||
# Copyright © 2024 Gregory Lirent
|
||||
|
||||
import time
|
||||
import os
|
||||
import yaml
|
||||
from typing import List
|
||||
from ipaddress import ip_network, IPv4Network, IPv4Address, IPv6Network, IPv6Address
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class LogLevel(Enum):
|
||||
critical = 'critical'
|
||||
error = 'error'
|
||||
warning = 'warning'
|
||||
info = 'info'
|
||||
debug = 'debug'
|
||||
trace = 'trace'
|
||||
|
||||
def __str__(self):
|
||||
return self.value
|
||||
|
||||
|
||||
class Proxy:
|
||||
|
||||
@staticmethod
|
||||
def __in_net(net: IPv4Network | IPv6Network, dest: IPv4Network | IPv6Network) -> bool:
|
||||
return net.network_address in dest and net.broadcast_address in dest
|
||||
|
||||
@staticmethod
|
||||
def __fetch_networks(net_list: List[str]) -> List[IPv4Network | IPv6Network]:
|
||||
networks = []
|
||||
if net_list:
|
||||
for net in net_list:
|
||||
networks.append(ip_network(net))
|
||||
return networks
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return f'proxy{hash(self)}'
|
||||
|
||||
@property
|
||||
def function_name(self):
|
||||
return f'is{self.name.capitalize()}'
|
||||
|
||||
@property
|
||||
def host(self):
|
||||
return self.__host
|
||||
|
||||
@property
|
||||
def port(self) -> int:
|
||||
return self.__port
|
||||
|
||||
def is_allowed_from(self, net: str | IPv4Address | IPv6Address | IPv4Network | IPv6Network) -> bool:
|
||||
if isinstance(net, (str, IPv4Address, IPv4Network)):
|
||||
net = ip_network(str(net))
|
||||
|
||||
if self.__deny:
|
||||
for dest in self.__deny:
|
||||
if self.__in_net(net, dest):
|
||||
return False
|
||||
|
||||
if not self.__allow:
|
||||
return True
|
||||
|
||||
for dest in self.__allow:
|
||||
if net.network_address in dest and net.broadcast_address in dest:
|
||||
return True
|
||||
return False
|
||||
|
||||
def render(self):
|
||||
content = f'function is{self.name}(h) {{\n'
|
||||
for target in self.__targets:
|
||||
content += f'\tif (shExpMatch(h, "{target}")) return true;\n'
|
||||
return f'{content}\treturn false;\n}}'
|
||||
|
||||
def __hash__(self):
|
||||
return abs(hash(str(self)))
|
||||
|
||||
def __str__(self):
|
||||
return f'PROXY {self.__host}:{self.__port}'
|
||||
|
||||
def __init__(self, *, host: str, port: int,
|
||||
allow: List[str] | None = None,
|
||||
deny: List[str] | None = None,
|
||||
targets: List[str] | None = None):
|
||||
|
||||
self.__host = host
|
||||
self.__port = port
|
||||
self.__allow = self.__fetch_networks(allow)
|
||||
self.__deny = self.__fetch_networks(deny)
|
||||
self.__targets = targets if targets else []
|
||||
|
||||
if not self.__host or not self.__port:
|
||||
raise ValueError("Proxy's host and port cannot be empty")
|
||||
if not self.__targets:
|
||||
raise ValueError("Pointless proxy")
|
||||
|
||||
|
||||
class PACContent:
|
||||
|
||||
@property
|
||||
def proxies(self) -> List[Proxy]:
|
||||
if not os.path.exists(self.__filename):
|
||||
return []
|
||||
|
||||
if os.stat(self.__filename).st_mtime >= self.__atime:
|
||||
with open(self.__filename, "r") as fd:
|
||||
conf: dict = yaml.safe_load(fd)
|
||||
|
||||
self.__atime = int(time.time())
|
||||
self.__proxies = []
|
||||
for proxy in conf.get('proxy_servers', []):
|
||||
try:
|
||||
self.__proxies.append(Proxy(**proxy))
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return self.__proxies
|
||||
|
||||
def render_for(self, net: str | IPv4Address | IPv6Address | IPv6Network | IPv4Network):
|
||||
f_blocks = []
|
||||
p_blocks = ["function FindProxyForURL(url, host) {"]
|
||||
|
||||
for proxy in self.proxies:
|
||||
if proxy.is_allowed_from(net):
|
||||
f_blocks.append(proxy.render())
|
||||
p_blocks.append(f'\tif ({proxy.function_name}(host) return "{str(proxy)}";')
|
||||
p_blocks.append('\treturn "DIRECT";\n}')
|
||||
f_blocks.append("\n".join(p_blocks))
|
||||
|
||||
return "\n\n".join(f_blocks)
|
||||
|
||||
def __init__(self, filename: str):
|
||||
self.__filename = filename
|
||||
self.__atime = 0
|
||||
self.__proxies = []
|
||||
Reference in New Issue
Block a user