from base58 import b58decode from sanic import response as sanic_response from app.core._crypto.signer import Signer from app.core._secrets import hot_seed from app.core.logger import make_log from app.core.models.keys import KnownKey from app.core.models._telegram.wrapped_bot import Wrapped_CBotChat from app.core.models.user_activity import UserActivity from app.core.models.user import User from sqlalchemy import select from app.core.storage import new_session from datetime import datetime, timedelta def attach_headers(response): response.headers["Access-Control-Allow-Origin"] = "*" response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" response.headers["Access-Control-Allow-Headers"] = "Origin, Content-Type, Accept, Authorization, Referer, User-Agent, Sec-Fetch-Dest, Sec-Fetch-Mode, Sec-Fetch-Site, x-file-name, x-last-chunk, x-chunk-start, x-upload-id" # response.headers["Access-Control-Allow-Credentials"] = "true" return response async def try_authorization(request): token = request.headers.get("Authorization") if not token: return token_bin = b58decode(token) if len(token_bin) != 57: make_log("auth", "Invalid token length", level="warning") return result = await request.ctx.db_session.execute(select(KnownKey).where(KnownKey.seed == token)) known_key = result.scalars().first() if not known_key: make_log("auth", "Unknown key", level="warning") return if known_key.type != "USER_API_V1": make_log("auth", "Invalid key type", level="warning") return ( token_version, user_id, timestamp, randpart ) = ( int.from_bytes(token_bin[0:1], 'big'), int.from_bytes(token_bin[1:17], 'big'), int.from_bytes(token_bin[17:25], 'big'), token_bin[25:] ) assert token_version == 1, "Invalid token version" assert user_id > 0, "Invalid user_id" assert timestamp > 0, "Invalid timestamp" if known_key.meta.get('I_user_id', -1) != user_id: make_log("auth", f"User ID mismatch: {known_key.meta.get('I_user_id', -1)} != {user_id}", level="warning") return result = await request.ctx.db_session.execute(select(User).where(User.id == known_key.meta['I_user_id'])) user = result.scalars().first() if not user: make_log("auth", "No user from key", level="warning") return request.ctx.user = user request.ctx.user_key = known_key request.ctx.user_uploader_wrapper = Wrapped_CBotChat(request.app.ctx.memory._telegram_bot, chat_id=user.telegram_id, db_session=request.ctx.db_session, user=user) request.ctx.user_client_wrapper = Wrapped_CBotChat(request.app.ctx.memory._client_telegram_bot, chat_id=user.telegram_id, db_session=request.ctx.db_session, user=user) async def try_service_authorization(request): signature = request.headers.get('X-Service-Signature') if not signature: return # TODO: смысл этой проверки если это можно подменить? message_hash_b58 = request.headers.get('X-Message-Hash') if not message_hash_b58: return message_hash = b58decode(message_hash_b58) signer = Signer(hot_seed) if signer.verify(message_hash, signature): request.ctx.verified_hash = message_hash async def save_activity(request): activity_meta = {} try: activity_meta["path"] = request.path if 'system' in activity_meta["path"]: return except: pass try: activity_meta["args"] = dict(request.args) except: pass try: activity_meta["json"] = dict(request.json) except: pass try: activity_meta["method"] = request.method except: pass try: activity_meta["ip"] = (request.headers['X-Forwarded-for'] if 'X-Forwarded-for' in request.headers else None) \ or request.remote_addr or request.ip activity_meta["ip"] = activity_meta["ip"].split(",")[0].strip() except: pass try: activity_meta["headers"] = dict(request.headers) except: pass new_user_activity = UserActivity( type="API_V1_REQUEST", meta=activity_meta, user_id=request.ctx.user.id if request.ctx.user else None, user_ip=activity_meta.get("ip", "0.0.0.0"), created=datetime.now() ) request.ctx.db_session.add(new_user_activity) await request.ctx.db_session.commit() async def attach_user_to_request(request): if request.method == 'OPTIONS': return attach_headers(sanic_response.text("OK")) request.ctx.db_session = new_session() request.ctx.verified_hash = None request.ctx.user = None request.ctx.user_key = None request.ctx.user_uploader_wrapper = Wrapped_CBotChat(request.app.ctx.memory._telegram_bot, db_session=request.ctx.db_session) request.ctx.user_client_wrapper = Wrapped_CBotChat(request.app.ctx.memory._client_telegram_bot, db_session=request.ctx.db_session) await try_authorization(request) await save_activity(request) await try_service_authorization(request) async def close_request_handler(request, response): if request.method == 'OPTIONS': response = sanic_response.text("OK") try: await request.ctx.db_session.close() except BaseException: pass response = attach_headers(response) return request, response async def close_db_session(request, response): request, response = await close_request_handler(request, response) response = attach_headers(response) return response