Merge branch 'python-josepy' into 'main'

Migrate from "python-jose" to an alternative

See merge request oscar.krause/fastapi-dls!52
This commit is contained in:
Oscar Krause 2025-07-07 07:52:12 +02:00
commit 8aff954829
10 changed files with 68 additions and 41 deletions

View File

@ -2,7 +2,7 @@ Package: fastapi-dls
Version: 0.0
Architecture: all
Maintainer: Oscar Krause oscar.krause@collinwebdesigns.de
Depends: python3, python3-fastapi, python3-uvicorn, python3-dotenv, python3-dateutil, python3-jose, python3-sqlalchemy, python3-cryptography, python3-markdown, uvicorn, openssl
Depends: python3, python3-fastapi, python3-uvicorn, python3-dotenv, python3-dateutil, python3-jwt, python3-sqlalchemy, python3-cryptography, python3-markdown, uvicorn, openssl
Recommends: curl
Installed-Size: 10240
Homepage: https://git.collinwebdesigns.de/oscar.krause/fastapi-dls

View File

@ -1,7 +1,7 @@
# https://packages.debian.org/hu/
fastapi==0.92.0
uvicorn[standard]==0.17.6
python-jose[cryptography]==3.3.0
pyjwt==2.10.1
cryptography==38.0.4
python-dateutil==2.8.2
sqlalchemy==1.4.46

View File

@ -1,7 +1,7 @@
# https://packages.ubuntu.com
fastapi==0.101.0
uvicorn[standard]==0.27.1
python-jose[cryptography]==3.3.0
pyjwt==2.10.1
cryptography==41.0.7
python-dateutil==2.8.2
sqlalchemy==1.4.50

View File

@ -1,7 +1,7 @@
# https://packages.ubuntu.com
fastapi==0.110.3
uvicorn[standard]==0.30.3
python-jose[cryptography]==3.3.0
pyjwt==2.10.1
cryptography==42.0.5
python-dateutil==2.9.0
sqlalchemy==2.0.32

View File

@ -8,7 +8,7 @@ pkgdesc='NVIDIA DLS server implementation with FastAPI'
arch=('any')
url='https://git.collinwebdesigns.de/oscar.krause/fastapi-dls'
license=('MIT')
depends=('python' 'python-jose' 'python-starlette' 'python-httpx' 'python-fastapi' 'python-dotenv' 'python-dateutil' 'python-sqlalchemy' 'python-cryptography' 'uvicorn' 'python-markdown' 'openssl')
depends=('python' 'python-pyjwt' 'python-starlette' 'python-httpx' 'python-fastapi' 'python-dotenv' 'python-dateutil' 'python-sqlalchemy' 'python-cryptography' 'uvicorn' 'python-markdown' 'openssl')
provider=("$pkgname")
install="$pkgname.install"
backup=('etc/default/fastapi-dls')

View File

@ -172,12 +172,12 @@ test:apt:
parallel:
matrix:
- IMAGE:
# - debian:trixie-slim # EOL: t.b.a.; "python3-jose" not available, but "python3-josepy"
- debian:bookworm-slim # EOL: June 06, 2026
- debian:trixie-slim # EOL: t.b.a.; "python3-jose" not available, but "python3-josepy" or "python3-jwt"
- debian:bookworm-slim # EOL: June 06, 2026
- debian:bullseye-slim # EOL: June 06, 2026
- ubuntu:24.04 # EOL: April 2036
# - ubuntu:24.10 # EOL: t.b.a.; "python3-jose" not available, but "python3-josepy"
# - ubuntu:25.04 # EOL: t.b.a.; "python3-jose" not available, but "python3-josepy"
- ubuntu:24.10 # EOL: t.b.a.; "python3-jose" not available, but "python3-josepy" or "python3-jwt"
- ubuntu:25.04 # EOL: t.b.a.; "python3-jose" not available, but "python3-josepy" or "python3-jwt"
needs:
- job: build:apt
artifacts: true

View File

@ -336,10 +336,7 @@ Successful tested with (**LTS Version**):
Not working with:
- Debian 11 (Bullseye) and lower (missing `python-jose` dependency)
- Debian 13 (Trixie) (missing `python-jose` dependency)
- Ubuntu 22.04 (Jammy Jellyfish) (not supported as for 15.01.2023 due to [fastapi - uvicorn version missmatch](https://bugs.launchpad.net/ubuntu/+source/fastapi/+bug/1970557))
- Ubuntu 24.10 (Oracular Oriole) (missing `python-jose` dependency)
**Run this on your server instance**

View File

@ -5,8 +5,8 @@ from contextlib import asynccontextmanager
from datetime import datetime, timedelta, UTC
from hashlib import sha256
from json import loads as json_loads, dumps as json_dumps
from os import getenv as env
from os.path import join, dirname
from os import getenv as env, listdir
from os.path import join, dirname, isfile, isdir, exists
from textwrap import wrap
from uuid import uuid4
@ -14,9 +14,9 @@ from dateutil.relativedelta import relativedelta
from dotenv import load_dotenv
from fastapi import FastAPI
from fastapi.requests import Request
from fastapi.staticfiles import StaticFiles
from fastapi.responses import Response, RedirectResponse, StreamingResponse
from jose import jws, jwk, jwt, JWTError
from jose.constants import ALGORITHMS
import jwt
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from starlette.middleware.cors import CORSMiddleware
@ -50,6 +50,7 @@ LEASE_RENEWAL_PERIOD = float(env('LEASE_RENEWAL_PERIOD', 0.15))
LEASE_RENEWAL_DELTA = timedelta(days=int(env('LEASE_EXPIRE_DAYS', 90)), hours=int(env('LEASE_EXPIRE_HOURS', 0)))
CLIENT_TOKEN_EXPIRE_DELTA = relativedelta(years=12)
CORS_ORIGINS = str(env('CORS_ORIGINS', '')).split(',') if (env('CORS_ORIGINS')) else [f'https://{DLS_URL}']
DRIVERS_DIR = env('DRIVERS_DIR', None)
DT_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
PRODUCT_MAPPING = ProductMapping(filename=join(dirname(__file__), 'static/product_mapping.json'))
@ -63,8 +64,8 @@ my_si_certificate = Cert.from_file(ca_setup.si_certificate_filename)
my_si_private_key = PrivateKey.from_file(ca_setup.si_private_key_filename)
my_si_public_key = my_si_private_key.public_key()
jwt_encode_key = jwk.construct(my_si_private_key.pem(), algorithm=ALGORITHMS.RS256)
jwt_decode_key = jwk.construct(my_si_private_key.public_key().pem(), algorithm=ALGORITHMS.RS256)
jwt_encode_key = my_si_private_key.pem() # todo: replace directly in code
jwt_decode_key = my_si_private_key.public_key().pem() # todo: replace directly in code
# Logging
LOG_LEVEL = logging.DEBUG if DEBUG else logging.INFO
@ -100,6 +101,9 @@ async def lifespan(_: FastAPI):
config = dict(openapi_url=None, docs_url=None, redoc_url=None) # dict(openapi_url='/-/openapi.json', docs_url='/-/docs', redoc_url='/-/redoc')
app = FastAPI(title='FastAPI-DLS', description='Minimal Delegated License Service (DLS).', version=VERSION, lifespan=lifespan, **config)
if DRIVERS_DIR is not None:
app.mount('/-/static-drivers', StaticFiles(directory=str(DRIVERS_DIR), html=False), name='drivers')
app.debug = DEBUG
app.add_middleware(
CORSMiddleware,
@ -114,7 +118,9 @@ app.add_middleware(
def __get_token(request: Request) -> dict:
authorization_header = request.headers.get('authorization')
token = authorization_header.split(' ')[1]
return jwt.decode(token=token, key=jwt_decode_key, algorithms=ALGORITHMS.RS256, options={'verify_aud': False})
# return jwt.decode(token=token, key=jwt_decode_key, algorithms=ALGORITHMS.RS256, options={'verify_aud': False})
return jwt.decode(jwt=token, key=jwt_decode_key, algorithms=['RS256'], options={'verify_aud': False})
# Endpoints
@ -206,6 +212,25 @@ async def _manage(request: Request):
return Response(response, media_type='text/html', status_code=200)
@app.get('/-/drivers/{directory:path}', summary='* List drivers directory')
async def _drivers(request: Request, directory: str | None):
if DRIVERS_DIR is None:
return Response(status_code=404, content=f'Variable "DRIVERS_DIR" not set.')
path = join(DRIVERS_DIR, directory)
if not exists(path) and not isfile(path):
return Response(status_code=404, content=f'Resource "{path}" not found!')
content = [{
"type": "file" if isfile(f'{path}/{_}') else "folder" if isdir(f'{path}/{_}') else "unknown",
"name": _,
"link": f'/-/static-drivers/{directory}{_}',
} for _ in listdir(path)]
return Response(content=json_dumps({"directory": path, "content": content}), media_type='application/json', status_code=200)
@app.get('/-/origins', summary='* Origins')
async def _origins(request: Request, leases: bool = False):
session = sessionmaker(bind=db)()
@ -295,9 +320,11 @@ async def _client_token():
},
}
content = jws.sign(payload, key=jwt_encode_key, headers=None, algorithm=ALGORITHMS.RS256)
# content = jws.sign(payload, key=jwt_encode_key, headers=None, algorithm=ALGORITHMS.RS256)
content = jwt.encode(payload=payload, key=jwt_encode_key, headers=None, algorithm='RS256')
response = StreamingResponse(iter([content]), media_type="text/plain")
# response = StreamingResponse(iter([content]), media_type="text/plain")
response = StreamingResponse(iter(content), media_type="text/plain")
filename = f'client_configuration_token_{datetime.now().strftime("%d-%m-%y-%H-%M-%S")}.tok'
response.headers["Content-Disposition"] = f'attachment; filename={filename}'
@ -386,7 +413,8 @@ async def auth_v1_code(request: Request):
'kid': SITE_KEY_XID
}
auth_code = jws.sign(payload, key=jwt_encode_key, headers={'kid': payload.get('kid')}, algorithm=ALGORITHMS.RS256)
# auth_code = jws.sign(payload, key=jwt_encode_key, headers={'kid': payload.get('kid')}, algorithm=ALGORITHMS.RS256)
auth_code = jwt.encode(payload=payload, key=jwt_encode_key, headers={'kid': payload.get('kid')}, algorithm='RS256')
response = {
"auth_code": auth_code,
@ -404,8 +432,9 @@ async def auth_v1_token(request: Request):
j, cur_time = json_loads((await request.body()).decode('utf-8')), datetime.now(UTC)
try:
payload = jwt.decode(token=j.get('auth_code'), key=jwt_decode_key, algorithms=ALGORITHMS.RS256)
except JWTError as e:
#payload = jwt.decode(token=j.get('auth_code'), key=jwt_decode_key, algorithms=ALGORITHMS.RS256)
payload = jwt.decode(jwt=j.get('auth_code'), key=jwt_decode_key, algorithms=['RS256'])
except jwt.PyJWTError as e:
response = {'status': 400, 'title': 'invalid token', 'detail': str(e)}
return Response(content=json_dumps(response), media_type='application/json', status_code=400)
@ -431,7 +460,7 @@ async def auth_v1_token(request: Request):
'origin_ref': origin_ref,
}
auth_token = jwt.encode(new_payload, key=jwt_encode_key, headers={'kid': payload.get('kid')}, algorithm=ALGORITHMS.RS256)
auth_token = jwt.encode(payload=new_payload, key=jwt_encode_key, headers={'kid': payload.get('kid')}, algorithm='RS256')
response = {
"auth_token": auth_token,
@ -470,8 +499,9 @@ async def leasing_v1_config_token(request: Request):
},
}
my_jwt_encode_key = jwk.construct(my_si_private_key.pem().decode('utf-8'), algorithm=ALGORITHMS.RS256)
config_token = jws.sign(payload, key=my_jwt_encode_key, headers=None, algorithm=ALGORITHMS.RS256)
# my_jwt_encode_key = jwk.construct(my_si_private_key.pem().decode('utf-8'), algorithm=ALGORITHMS.RS256)
# config_token = jws.sign(payload, key=my_jwt_encode_key, headers=None, algorithm=ALGORITHMS.RS256)
config_token = jwt.encode(payload=payload, key=jwt_encode_key, headers=None, algorithm='RS256')
response_ca_chain = my_ca_certificate.pem().decode('utf-8').strip()
@ -702,7 +732,7 @@ async def leasing_v1_lessor_shutdown(request: Request):
j, cur_time = json_loads((await request.body()).decode('utf-8')), datetime.now(UTC)
token = j.get('token')
token = jwt.decode(token=token, key=jwt_decode_key, algorithms=ALGORITHMS.RS256, options={'verify_aud': False})
token = jwt.decode(jwt=token, key=jwt_decode_key, algorithms='RS256', options={'verify_aud': False})
origin_ref = token.get('origin_ref')
released_lease_list = list(map(lambda x: x.lease_ref, Lease.find_by_origin_ref(db, origin_ref)))

View File

@ -1,6 +1,6 @@
fastapi==0.115.14
uvicorn[standard]==0.35.0
python-jose[cryptography]==3.5.0
pyjwt==2.10.1
cryptography==45.0.5
python-dateutil==2.9.0
sqlalchemy==2.0.41

View File

@ -4,13 +4,13 @@ from base64 import b64encode as b64enc
from calendar import timegm
from datetime import datetime, UTC
from hashlib import sha256
from json import loads as json_loads, dumps as json_dumps
from uuid import uuid4, UUID
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives.hashes import SHA256
from dateutil.relativedelta import relativedelta
from jose import jwt, jwk, jws
from jose.constants import ALGORITHMS
import jwt
from starlette.testclient import TestClient
# add relative path to use packages as they were in the app/ dir
@ -38,12 +38,12 @@ my_si_public_key = my_si_private_key.public_key()
my_si_public_key_as_pem = my_si_private_key.public_key().pem()
my_si_certificate = Cert.from_file(ca_setup.si_certificate_filename)
jwt_encode_key = jwk.construct(my_si_private_key_as_pem, algorithm=ALGORITHMS.RS256)
jwt_decode_key = jwk.construct(my_si_public_key_as_pem, algorithm=ALGORITHMS.RS256)
jwt_encode_key = my_si_private_key.pem()
jwt_decode_key = my_si_private_key.public_key().pem()
def __bearer_token(origin_ref: str) -> str:
token = jwt.encode({"origin_ref": origin_ref}, key=jwt_encode_key, algorithm=ALGORITHMS.RS256)
# token = jwt.encode({"origin_ref": origin_ref}, key=jwt_encode_key, algorithm=ALGORITHMS.RS256)
token = jwt.encode(payload={"origin_ref": origin_ref}, key=jwt_encode_key, algorithm='RS256')
token = f'Bearer {token}'
return token
@ -145,12 +145,12 @@ def test_config_token():
assert nv_si_certificate.public_key().mod() == nv_response_public_key.get('mod')[0]
assert nv_si_certificate.authority_key_identifier() == nv_ca_chain.subject_key_identifier()
nv_jwt_decode_key = jwk.construct(nv_response_public_cert, algorithm=ALGORITHMS.RS256)
# nv_jwt_decode_key = jwk.construct(nv_response_public_cert, algorithm=ALGORITHMS.RS256)
nv_response_config_token = response.json().get('configToken')
payload = jws.verify(nv_response_config_token, key=nv_jwt_decode_key, algorithms=ALGORITHMS.RS256)
payload = json.loads(payload)
#payload = jws.verify(nv_response_config_token, key=nv_jwt_decode_key, algorithms=ALGORITHMS.RS256)
payload = jwt.decode(jwt=nv_response_config_token, key=nv_si_certificate.public_key().pem(), algorithms=['RS256'], options={'verify_signature': False})
assert payload.get('iss') == 'NLS Service Instance'
assert payload.get('aud') == 'NLS Licensed Client'
assert payload.get('service_instance_ref') == INSTANCE_REF
@ -230,7 +230,7 @@ def test_auth_v1_code():
response = client.post('/auth/v1/code', json=payload)
assert response.status_code == 200
payload = jwt.get_unverified_claims(token=response.json().get('auth_code'))
payload = jwt.decode(response.json().get('auth_code'), key=my_si_public_key_as_pem, algorithms=['RS256'])
assert payload.get('origin_ref') == ORIGIN_REF
@ -247,7 +247,7 @@ def test_auth_v1_token():
"kid": "00000000-0000-0000-0000-000000000000"
}
payload = {
"auth_code": jwt.encode(payload, key=jwt_encode_key, headers={'kid': payload.get('kid')}, algorithm=ALGORITHMS.RS256),
"auth_code": jwt.encode(payload, key=jwt_encode_key, headers={'kid': payload.get('kid')}, algorithm='RS256'),
"code_verifier": SECRET,
}
@ -255,7 +255,7 @@ def test_auth_v1_token():
assert response.status_code == 200
token = response.json().get('auth_token')
payload = jwt.decode(token=token, key=jwt_decode_key, algorithms=ALGORITHMS.RS256, options={'verify_aud': False})
payload = jwt.decode(token, key=jwt_decode_key, algorithms=['RS256'], options={'verify_signature': False})
assert payload.get('origin_ref') == ORIGIN_REF