uploader-bot/app/api/middleware.py

74 lines
2.2 KiB
Python

from app.core.models.user import User
from app.core.models.keys import KnownKey
from app.core.storage import Session
from app.core.logger import make_log
from base58 import b58encode, b58decode
async def try_authorization(request):
token = request.headers.get("Authorization")
if not token:
return
token_bin = b58decode(token)
if len(token_bin) != 57:
make_log("auth", "Invalid token length", level="warning")
return
known_key = request.ctx.db_session.query(KnownKey).filter(KnownKey.seed == token).first()
if not known_key:
make_log("auth", "Unknown key", level="warning")
return
if known_key.type != "USER_API_V1":
make_log("auth", "Invalid key type", level="warning")
return
(
token_version,
user_id,
timestamp,
randpart
) = (
int.from_bytes(token_bin[0:1], 'big'),
int.from_bytes(token_bin[1:17], 'big'),
int.from_bytes(token_bin[17:25], 'big'),
token_bin[25:]
)
assert token_version == 1, "Invalid token version"
assert user_id > 0, "Invalid user_id"
assert timestamp > 0, "Invalid timestamp"
if known_key.meta.get('I_user_id', -1) != user_id:
make_log("auth", f"User ID mismatch: {known_key.meta.get('I_user_id', -1)} != {user_id}", level="warning")
return
user = request.ctx.db_session.query(User).filter(User.id == known_key.meta['I_user_id']).first()
if not user:
make_log("auth", "No user from key", level="warning")
return
request.ctx.user = user
request.ctx.user_key = known_key
async def attach_user_to_request(request):
request.ctx.db_session = Session()
await try_authorization(request)
async def close_db_session(request, response):
if request.method == 'OPTIONS':
response = response.text("OK")
try:
request.ctx.db_session.close()
except BaseException as e:
pass
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = "Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token, Authorization, Refer"