mirror of
https://git.collinwebdesigns.de/oscar.krause/fastapi-dls.git
synced 2024-12-27 14:09:33 +03:00
main.py - replaced SITE_KEY and INSTANCE_KEY with only INSTANCE_KEY
This commit is contained in:
parent
c6607bedba
commit
78ddaa56d3
31
app/main.py
31
app/main.py
@ -21,11 +21,9 @@ app = FastAPI()
|
||||
LEASE_EXPIRE_DELTA = relativedelta(minutes=15) # days=90
|
||||
|
||||
URL = '192.168.178.196'
|
||||
SITE_KEY_FILE = load_key('/opt/fastapi-dls/site.key')
|
||||
SITE_KEY_XID = '00000000-0000-0000-0000-000000000000'
|
||||
|
||||
SITE_KEY_RSA = private_bytes(SITE_KEY_FILE)
|
||||
SITE_KEY_PUB = public_key(SITE_KEY_FILE)
|
||||
INSTANCE_KEY_RSA = load_key('cert/instance.private.pem')
|
||||
INSTANCE_KEY_PUB = load_key('cert/instance.public.pem')
|
||||
|
||||
|
||||
@app.get('/')
|
||||
@ -41,11 +39,10 @@ async def status(request: Request):
|
||||
# venv/lib/python3.9/site-packages/nls_core_service_instance/service_instance_token_manager.py
|
||||
@app.get('/client-token')
|
||||
async def client_token():
|
||||
public_key_me = SITE_KEY_FILE.public_key().public_numbers()
|
||||
service_instance_public_key_me = {
|
||||
"mod": hex(public_key_me.n)[2:],
|
||||
"exp": public_key_me.e,
|
||||
},
|
||||
"mod": hex(INSTANCE_KEY_PUB.public_key().n)[2:],
|
||||
"exp": INSTANCE_KEY_PUB.public_key().e,
|
||||
}
|
||||
|
||||
cur_time = datetime.utcnow()
|
||||
exp_time = cur_time + relativedelta(years=12)
|
||||
@ -53,9 +50,9 @@ async def client_token():
|
||||
"jti": str(uuid4()),
|
||||
"iss": "NLS Service Instance",
|
||||
"aud": "NLS Licensed Client",
|
||||
"iat": cur_time,
|
||||
"nbf": cur_time,
|
||||
"exp": exp_time,
|
||||
"iat": timegm(cur_time.timetuple()),
|
||||
"nbf": timegm(cur_time.timetuple()),
|
||||
"exp": timegm(exp_time.timetuple()),
|
||||
"update_mode": "ABSOLUTE",
|
||||
"scope_ref_list": [
|
||||
"482f24b5-0a60-4ec2-a63a-9ed00bc2534e"
|
||||
@ -78,13 +75,13 @@ async def client_token():
|
||||
},
|
||||
"service_instance_public_key_configuration": {
|
||||
"service_instance_public_key_me": service_instance_public_key_me,
|
||||
"service_instance_public_key_pem": SITE_KEY_PUB.decode('utf-8'),
|
||||
"service_instance_public_key_pem": INSTANCE_KEY_PUB.export_key().decode('utf-8'),
|
||||
"key_retention_mode": "LATEST_ONLY"
|
||||
}
|
||||
}
|
||||
|
||||
key = jwk.construct(SITE_KEY_RSA, algorithm=ALGORITHMS.RS512)
|
||||
data = jwt.encode(payload, key=key, headers=None, algorithm='RS256')
|
||||
key = jwk.construct(INSTANCE_KEY_RSA.export_key().decode('utf-8'), algorithm=ALGORITHMS.RS256)
|
||||
data = jws.sign(payload, key=key, headers=None, algorithm='RS256')
|
||||
|
||||
response = StreamingResponse(iter([data]), media_type="text/plain")
|
||||
response.headers["Content-Disposition"] = f'attachment; filename=client_configuration_token_{datetime.now().strftime("%d-%m-%y-%H-%M-%S")}'
|
||||
@ -144,7 +141,7 @@ async def code(request: Request):
|
||||
kid = payload.get('kid')
|
||||
if kid:
|
||||
headers = {'kid': kid}
|
||||
key = jwk.construct(SITE_KEY_RSA, algorithm=ALGORITHMS.RS512)
|
||||
key = jwk.construct(INSTANCE_KEY_RSA.export_key().decode('utf-8'), algorithm=ALGORITHMS.RS512)
|
||||
auth_code = jws.sign(payload, key, headers=headers, algorithm='RS256')
|
||||
|
||||
response = {
|
||||
@ -165,7 +162,7 @@ async def token(request: Request):
|
||||
# {"auth_code":"eyJhbGciOiJSUzI1NiIsImtpZCI6IjAwMDAwMDAwLTAwMDAtMDAwMC0wMDAwLTAwMDAwMDAwMDAwMCIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE2NzExODI5MTQsImV4cCI6MTY3MTI2OTMxNCwiY2hhbGxlbmdlIjoiaXdZdFpIME03K0ZZUWdRQXEwbjhabThWcFpJbWdtV1NDSXI1MkdTSlMxayIsIm9yaWdpbl9yZWYiOiJpd1l0WkgwTTcrRllRZ1FBcTBuOFptOFZwWkltZ21XU0NJcjUyR1NKUzFrIiwia2V5X3JlZiI6IjAwMDAwMDAwLTAwMDAtMDAwMC0wMDAwLTAwMDAwMDAwMDAwMCIsImtpZCI6IjAwMDAwMDAwLTAwMDAtMDAwMC0wMDAwLTAwMDAwMDAwMDAwMCJ9.hkBPQx7UbXqwRzpTSp5fASwLg7rJOgjDOGD98Zh6pEkPW09KjxcsaHKeR8KIZmDS1S_kLed93-UzUY4wXAylFBlM-daL-TEbHJau2muZGWXPrtdsGLI9CLFcc0dmocq1_5rnRV3liqjdZwL8djK9Fx_5tOzEfeI9oCJ49Sh2LD_p1vkFcqUv9z9mVL9IGsoRM6y4hJ2YKBloijzhMLp5E7nojyD6Z8PQZ0mOIOc3tncAaXQS47JhgGsJPUDR-YoLF5uNpAlJKZP2eZWJt3P7MvhIz3lxFPUJ5jHX64Vf0Ds10-GBctZuy1-eCLBXj74uQy_U4KlnCif-5N8bPTvgxw","code_verifier":"CgnDPaugQCb4U6l3EfJSFsA/JxMqNO4TqONeb9yl8EVRWU88yTPlEeJgZQO0f/JVnScYOsvwa0jcvTAMBulEKgucfxDDVL1cBOylGugQ0QlJsXU5hJ8VLAQtOyPthnVyEutERNyOKVwl3YI5Z5EfUcfuhDqmxBUpnAFtQ9H3R3g"}
|
||||
|
||||
# payload = self._security.get_valid_payload(req.auth_code) # todo
|
||||
key = jwk.construct(SITE_KEY_PUB, algorithm=ALGORITHMS.RS512)
|
||||
key = jwk.construct(INSTANCE_KEY_PUB.export_key().decode('utf-8'), algorithm=ALGORITHMS.RS512)
|
||||
payload = jwt.decode(token=j['auth_code'], key=key)
|
||||
|
||||
# validate the code challenge
|
||||
@ -190,7 +187,7 @@ async def token(request: Request):
|
||||
kid = payload.get('kid')
|
||||
if kid:
|
||||
headers = {'kid': kid}
|
||||
key = jwk.construct(SITE_KEY_RSA, algorithm=ALGORITHMS.RS512)
|
||||
key = jwk.construct(INSTANCE_KEY_RSA.export_key().decode('utf-8'), algorithm=ALGORITHMS.RS512)
|
||||
auth_token = jwt.encode(new_payload, key=key, headers=headers, algorithm='RS256')
|
||||
|
||||
response = {
|
||||
|
Loading…
Reference in New Issue
Block a user