fastapi-dls/test/main.py

371 lines
13 KiB
Python
Raw Normal View History

import json
import sys
2022-12-28 16:30:54 +03:00
from base64 import b64encode as b64enc
from calendar import timegm
from datetime import datetime, UTC
from hashlib import sha256
from uuid import uuid4, UUID
2022-12-23 15:42:02 +03:00
2025-04-22 15:38:17 +03:00
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives.hashes import SHA256
2022-12-28 16:30:54 +03:00
from dateutil.relativedelta import relativedelta
from jose import jwt, jwk, jws
2022-12-28 16:30:54 +03:00
from jose.constants import ALGORITHMS
2022-12-23 09:35:37 +03:00
from starlette.testclient import TestClient
2022-12-23 13:24:40 +03:00
# add relative path to use packages as they were in the app/ dir
sys.path.append('../')
sys.path.append('../app')
2022-12-23 09:35:37 +03:00
2022-12-23 13:24:40 +03:00
from app import main
2025-04-16 09:37:28 +03:00
from util import CASetup, PrivateKey, PublicKey, Cert
2022-12-23 09:35:37 +03:00
client = TestClient(main.app)
# Instance
INSTANCE_REF = '10000000-0000-0000-0000-000000000001'
ORIGIN_REF, ALLOTMENT_REF, SECRET = str(uuid4()), '20000000-0000-0000-0000-000000000001', 'HelloWorld'
2022-12-28 16:30:54 +03:00
# CA & Signing
ca_setup = CASetup(service_instance_ref=INSTANCE_REF)
2025-04-22 15:16:30 +03:00
my_root_private_key = PrivateKey.from_file(ca_setup.root_private_key_filename)
2025-04-16 09:37:28 +03:00
my_root_certificate = Cert.from_file(ca_setup.root_certificate_filename)
2025-04-22 15:16:30 +03:00
my_ca_certificate = Cert.from_file(ca_setup.ca_certificate_filename)
my_ca_private_key = PrivateKey.from_file(ca_setup.ca_private_key_filename)
my_si_private_key = PrivateKey.from_file(ca_setup.si_private_key_filename)
my_si_private_key_as_pem = my_si_private_key.pem()
my_si_public_key = my_si_private_key.public_key()
my_si_public_key_as_pem = my_si_private_key.public_key().pem()
2025-04-22 15:16:30 +03:00
my_si_certificate = Cert.from_file(ca_setup.si_certificate_filename)
2022-12-28 16:30:54 +03:00
jwt_encode_key = jwk.construct(my_si_private_key_as_pem, algorithm=ALGORITHMS.RS256)
jwt_decode_key = jwk.construct(my_si_public_key_as_pem, algorithm=ALGORITHMS.RS256)
2022-12-23 15:42:02 +03:00
2022-12-23 09:35:37 +03:00
2022-12-29 21:07:30 +03:00
def __bearer_token(origin_ref: str) -> str:
token = jwt.encode({"origin_ref": origin_ref}, key=jwt_encode_key, algorithm=ALGORITHMS.RS256)
token = f'Bearer {token}'
return token
2025-04-14 22:38:01 +03:00
def test_signing():
signature_set_header = my_si_private_key.generate_signature(b'Hello')
2025-04-14 22:38:01 +03:00
# test plain
my_si_public_key.verify_signature(signature_set_header, b'Hello')
2025-04-14 22:38:01 +03:00
# test "X-NLS-Signature: b'....'
x_nls_signature_header_value = f'{signature_set_header.hex().encode()}'
assert f'{x_nls_signature_header_value}'.startswith('b\'')
assert f'{x_nls_signature_header_value}'.endswith('\'')
# test eval
signature_get_header = eval(x_nls_signature_header_value)
signature_get_header = bytes.fromhex(signature_get_header.decode('ascii'))
my_si_public_key.verify_signature(signature_get_header, b'Hello')
2025-04-14 22:38:01 +03:00
2025-04-22 15:16:30 +03:00
def test_keypair_and_certificates():
assert my_root_certificate.public_key().mod() == my_root_private_key.public_key().mod()
assert my_ca_certificate.public_key().mod() == my_ca_private_key.public_key().mod()
assert my_si_certificate.public_key().mod() == my_si_public_key.mod()
assert len(my_root_certificate.public_key().mod()) == 1024
assert len(my_ca_certificate.public_key().mod()) == 1024
assert len(my_si_certificate.public_key().mod()) == 512
#assert my_si_certificate.public_key().mod() != my_si_public_key.mod()
2025-04-22 15:38:17 +03:00
my_root_certificate.public_key().raw().verify(
my_ca_certificate.raw().signature,
my_ca_certificate.raw().tbs_certificate_bytes,
PKCS1v15(),
SHA256(),
)
my_ca_certificate.public_key().raw().verify(
my_si_certificate.raw().signature,
my_si_certificate.raw().tbs_certificate_bytes,
PKCS1v15(),
SHA256(),
)
2025-04-22 15:16:30 +03:00
2022-12-23 09:35:37 +03:00
def test_index():
response = client.get('/')
assert response.status_code == 200
2022-12-29 12:37:47 +03:00
def test_health():
response = client.get('/-/health')
assert response.status_code == 200
assert response.json().get('status') == 'up'
2022-12-29 12:37:47 +03:00
def test_config():
2023-01-03 19:39:10 +03:00
response = client.get('/-/config')
assert response.status_code == 200
2025-04-16 09:37:28 +03:00
def test_config_root_ca():
2025-04-22 09:53:30 +03:00
response = client.get('/-/config/root-certificate')
2025-04-16 09:37:28 +03:00
assert response.status_code == 200
2025-04-22 15:16:30 +03:00
assert response.content.decode('utf-8').strip() == my_root_certificate.pem().decode('utf-8').strip()
2025-04-16 09:37:28 +03:00
2022-12-29 12:37:47 +03:00
def test_readme():
response = client.get('/-/readme')
assert response.status_code == 200
def test_manage():
response = client.get('/-/manage')
assert response.status_code == 200
2022-12-23 09:48:47 +03:00
def test_client_token():
2022-12-29 22:41:02 +03:00
response = client.get('/-/client-token')
assert response.status_code == 200
2025-04-16 13:01:19 +03:00
def test_config_token():
# https://git.collinwebdesigns.de/nvidia/nls/-/blob/main/src/test/test_config_token.py
response = client.post('/leasing/v1/config-token', json={"service_instance_ref": INSTANCE_REF})
assert response.status_code == 200
nv_response_certificate_configuration = response.json().get('certificateConfiguration')
2025-04-22 15:16:30 +03:00
nv_ca_chain = nv_response_certificate_configuration.get('caChain')[0].encode('utf-8')
nv_ca_chain = Cert(nv_ca_chain)
nv_response_public_cert = nv_response_certificate_configuration.get('publicCert').encode('utf-8')
2025-04-22 15:16:30 +03:00
nv_response_public_key = nv_response_certificate_configuration.get('publicKey')
nv_si_certificate = Cert(nv_response_public_cert)
assert nv_si_certificate.public_key().mod() == nv_response_public_key.get('mod')[0]
assert nv_si_certificate.authority_key_identifier() == nv_ca_chain.subject_key_identifier()
nv_jwt_decode_key = jwk.construct(nv_response_public_cert, algorithm=ALGORITHMS.RS256)
nv_response_config_token = response.json().get('configToken')
payload = jws.verify(nv_response_config_token, key=nv_jwt_decode_key, algorithms=ALGORITHMS.RS256)
payload = json.loads(payload)
assert payload.get('iss') == 'NLS Service Instance'
assert payload.get('aud') == 'NLS Licensed Client'
assert payload.get('service_instance_ref') == INSTANCE_REF
nv_si_public_key_configuration = payload.get('service_instance_public_key_configuration')
nv_si_public_key_me = nv_si_public_key_configuration.get('service_instance_public_key_me')
2025-04-22 09:53:30 +03:00
assert len(nv_si_public_key_me.get('mod')) == 512 # nv_si_public_key_mod
assert nv_si_public_key_me.get('exp') == 65537 # nv_si_public_key_exp
2022-12-29 12:37:47 +03:00
def test_origins():
pass
def test_origins_delete():
pass
def test_leases():
pass
def test_lease_delete():
pass
2022-12-23 09:48:47 +03:00
def test_auth_v1_origin():
payload = {
"registration_pending": False,
"environment": {
"guest_driver_version": "guest_driver_version",
"hostname": "myhost",
"ip_address_list": ["192.168.1.123"],
"os_version": "os_version",
"os_platform": "os_platform",
"fingerprint": {"mac_address_list": ["ff:ff:ff:ff:ff:ff"]},
"host_driver_version": "host_driver_version"
},
"update_pending": False,
2022-12-23 15:42:02 +03:00
"candidate_origin_ref": ORIGIN_REF,
2022-12-23 09:48:47 +03:00
}
2022-12-23 15:42:02 +03:00
2022-12-23 09:48:47 +03:00
response = client.post('/auth/v1/origin', json=payload)
assert response.status_code == 200
assert response.json().get('origin_ref') == ORIGIN_REF
2022-12-23 09:48:47 +03:00
2022-12-28 16:53:17 +03:00
def auth_v1_origin_update():
payload = {
"registration_pending": False,
"environment": {
"guest_driver_version": "guest_driver_version",
"hostname": "myhost",
"ip_address_list": ["192.168.1.123"],
"os_version": "os_version",
"os_platform": "os_platform",
"fingerprint": {"mac_address_list": ["ff:ff:ff:ff:ff:ff"]},
"host_driver_version": "host_driver_version"
},
"update_pending": False,
"candidate_origin_ref": ORIGIN_REF,
}
response = client.post('/auth/v1/origin/update', json=payload)
assert response.status_code == 200
assert response.json().get('origin_ref') == ORIGIN_REF
2022-12-28 16:53:17 +03:00
2022-12-23 09:48:47 +03:00
def test_auth_v1_code():
2022-12-23 15:42:02 +03:00
payload = {
2022-12-28 16:30:54 +03:00
"code_challenge": b64enc(sha256(SECRET.encode('utf-8')).digest()).rstrip(b'=').decode('utf-8'),
2022-12-23 15:42:02 +03:00
"origin_ref": ORIGIN_REF,
}
response = client.post('/auth/v1/code', json=payload)
assert response.status_code == 200
payload = jwt.get_unverified_claims(token=response.json().get('auth_code'))
assert payload.get('origin_ref') == ORIGIN_REF
2022-12-23 09:48:47 +03:00
def test_auth_v1_token():
cur_time = datetime.now(UTC)
2022-12-28 16:30:54 +03:00
access_expires_on = cur_time + relativedelta(hours=1)
payload = {
"iat": timegm(cur_time.timetuple()),
"exp": timegm(access_expires_on.timetuple()),
"challenge": b64enc(sha256(SECRET.encode('utf-8')).digest()).rstrip(b'=').decode('utf-8'),
"origin_ref": ORIGIN_REF,
"key_ref": "00000000-0000-0000-0000-000000000000",
"kid": "00000000-0000-0000-0000-000000000000"
}
payload = {
2024-12-02 12:03:35 +03:00
"auth_code": jwt.encode(payload, key=jwt_encode_key, headers={'kid': payload.get('kid')}, algorithm=ALGORITHMS.RS256),
2022-12-28 16:30:54 +03:00
"code_verifier": SECRET,
}
response = client.post('/auth/v1/token', json=payload)
assert response.status_code == 200
token = response.json().get('auth_token')
2022-12-28 16:30:54 +03:00
payload = jwt.decode(token=token, key=jwt_decode_key, algorithms=ALGORITHMS.RS256, options={'verify_aud': False})
assert payload.get('origin_ref') == ORIGIN_REF
2022-12-23 09:48:47 +03:00
def test_leasing_v1_lessor():
2022-12-28 16:53:17 +03:00
payload = {
'client_challenge': 'my_unique_string',
2022-12-28 16:53:17 +03:00
'fulfillment_context': {
'fulfillment_class_ref_list': []
},
'lease_proposal_list': [{
'license_type_qualifiers': {'count': 1},
2025-04-14 21:05:10 +03:00
'product': {'name': 'NVIDIA Virtual Applications'}
2022-12-28 16:53:17 +03:00
}],
'proposal_evaluation_mode': 'ALL_OF',
'scope_ref_list': [ALLOTMENT_REF]
2022-12-28 16:53:17 +03:00
}
2022-12-29 21:07:30 +03:00
response = client.post('/leasing/v1/lessor', json=payload, headers={'authorization': __bearer_token(ORIGIN_REF)})
2022-12-28 16:53:17 +03:00
assert response.status_code == 200
client_challenge = response.json().get('client_challenge')
assert client_challenge == payload.get('client_challenge')
2025-04-14 14:26:58 +03:00
signature = eval(response.headers.get('X-NLS-Signature'))
2025-04-14 11:48:43 +03:00
assert len(signature) == 512
2025-04-14 22:38:01 +03:00
signature = bytes.fromhex(signature.decode('ascii'))
assert len(signature) == 256
my_si_public_key.verify_signature(signature, response.content)
lease_result_list = response.json().get('lease_result_list')
2022-12-28 16:53:17 +03:00
assert len(lease_result_list) == 1
assert len(lease_result_list[0]['lease']['ref']) == 36
assert str(UUID(lease_result_list[0]['lease']['ref'])) == lease_result_list[0]['lease']['ref']
2025-04-14 21:05:10 +03:00
assert lease_result_list[0]['lease']['product_name'] == 'NVIDIA Virtual Applications'
assert lease_result_list[0]['lease']['feature_name'] == 'GRID-Virtual-Apps'
2022-12-23 09:48:47 +03:00
def test_leasing_v1_lessor_lease():
2022-12-29 21:07:30 +03:00
response = client.get('/leasing/v1/lessor/leases', headers={'authorization': __bearer_token(ORIGIN_REF)})
2022-12-28 16:53:17 +03:00
assert response.status_code == 200
active_lease_list = response.json().get('active_lease_list')
2022-12-28 16:53:17 +03:00
assert len(active_lease_list) == 1
assert len(active_lease_list[0]) == 36
assert str(UUID(active_lease_list[0])) == active_lease_list[0]
2022-12-23 09:48:47 +03:00
def test_leasing_v1_lease_renew():
response = client.get('/leasing/v1/lessor/leases', headers={'authorization': __bearer_token(ORIGIN_REF)})
active_lease_list = response.json().get('active_lease_list')
active_lease_ref = active_lease_list[0]
###
payload = {'client_challenge': 'my_unique_string'}
response = client.put(f'/leasing/v1/lease/{active_lease_ref}', json=payload, headers={'authorization': __bearer_token(ORIGIN_REF)})
2022-12-28 16:53:17 +03:00
assert response.status_code == 200
client_challenge = response.json().get('client_challenge')
assert client_challenge == payload.get('client_challenge')
2025-04-14 14:26:58 +03:00
signature = eval(response.headers.get('X-NLS-Signature'))
2025-04-14 12:05:17 +03:00
assert len(signature) == 512
2025-04-14 22:38:01 +03:00
signature = bytes.fromhex(signature.decode('ascii'))
assert len(signature) == 256
my_si_public_key.verify_signature(signature, response.content)
lease_ref = response.json().get('lease_ref')
assert len(lease_ref) == 36
assert lease_ref == active_lease_ref
2022-12-23 09:48:47 +03:00
def test_leasing_v1_lease_delete():
response = client.get('/leasing/v1/lessor/leases', headers={'authorization': __bearer_token(ORIGIN_REF)})
active_lease_list = response.json().get('active_lease_list')
active_lease_ref = active_lease_list[0]
###
response = client.delete(f'/leasing/v1/lease/{active_lease_ref}', headers={'authorization': __bearer_token(ORIGIN_REF)})
assert response.status_code == 200
lease_ref = response.json().get('lease_ref')
assert len(lease_ref) == 36
assert lease_ref == active_lease_ref
2022-12-23 09:48:47 +03:00
def test_leasing_v1_lessor_lease_remove():
2025-03-18 12:33:52 +03:00
# see "test_leasing_v1_lessor()"
payload = {
'fulfillment_context': {
'fulfillment_class_ref_list': []
},
'lease_proposal_list': [{
'license_type_qualifiers': {'count': 1},
2025-04-14 21:05:10 +03:00
'product': {'name': 'NVIDIA Virtual Applications'}
2025-03-18 12:33:52 +03:00
}],
'proposal_evaluation_mode': 'ALL_OF',
'scope_ref_list': [ALLOTMENT_REF]
}
response = client.post('/leasing/v1/lessor', json=payload, headers={'authorization': __bearer_token(ORIGIN_REF)})
lease_result_list = response.json().get('lease_result_list')
lease_ref = lease_result_list[0]['lease']['ref']
#
2022-12-29 21:07:30 +03:00
response = client.delete('/leasing/v1/lessor/leases', headers={'authorization': __bearer_token(ORIGIN_REF)})
2022-12-28 16:53:17 +03:00
assert response.status_code == 200
released_lease_list = response.json().get('released_lease_list')
2022-12-28 16:53:17 +03:00
assert len(released_lease_list) == 1
assert len(released_lease_list[0]) == 36
assert released_lease_list[0] == lease_ref