164 lines
5.2 KiB
Python
164 lines
5.2 KiB
Python
from base58 import b58decode
|
|
from sanic import response as sanic_response
|
|
|
|
from app.core._crypto.signer import Signer
|
|
from app.core._secrets import hot_seed
|
|
from app.core.logger import make_log
|
|
from app.core.models.keys import KnownKey
|
|
from app.core.models._telegram.wrapped_bot import Wrapped_CBotChat
|
|
from app.core.models.user_activity import UserActivity
|
|
from app.core.models.user import User
|
|
from app.core.storage import Session
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
def attach_headers(response):
|
|
response.headers["Access-Control-Allow-Origin"] = "*"
|
|
response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS"
|
|
response.headers["Access-Control-Allow-Headers"] = "Origin, Content-Type, Accept, Authorization, Referer, User-Agent, Sec-Fetch-Dest, Sec-Fetch-Mode, Sec-Fetch-Site"
|
|
response.headers["Access-Control-Allow-Credentials"] = "true"
|
|
return response
|
|
|
|
|
|
async def try_authorization(request):
|
|
token = request.headers.get("Authorization")
|
|
if not token:
|
|
return
|
|
|
|
token_bin = b58decode(token)
|
|
if len(token_bin) != 57:
|
|
make_log("auth", "Invalid token length", level="warning")
|
|
return
|
|
|
|
known_key = request.ctx.db_session.query(KnownKey).filter(KnownKey.seed == token).first()
|
|
if not known_key:
|
|
make_log("auth", "Unknown key", level="warning")
|
|
return
|
|
|
|
if known_key.type != "USER_API_V1":
|
|
make_log("auth", "Invalid key type", level="warning")
|
|
return
|
|
|
|
(
|
|
token_version,
|
|
user_id,
|
|
timestamp,
|
|
randpart
|
|
) = (
|
|
int.from_bytes(token_bin[0:1], 'big'),
|
|
int.from_bytes(token_bin[1:17], 'big'),
|
|
int.from_bytes(token_bin[17:25], 'big'),
|
|
token_bin[25:]
|
|
)
|
|
assert token_version == 1, "Invalid token version"
|
|
assert user_id > 0, "Invalid user_id"
|
|
assert timestamp > 0, "Invalid timestamp"
|
|
|
|
if known_key.meta.get('I_user_id', -1) != user_id:
|
|
make_log("auth", f"User ID mismatch: {known_key.meta.get('I_user_id', -1)} != {user_id}", level="warning")
|
|
return
|
|
|
|
user = request.ctx.db_session.query(User).filter(User.id == known_key.meta['I_user_id']).first()
|
|
if not user:
|
|
make_log("auth", "No user from key", level="warning")
|
|
return
|
|
|
|
request.ctx.user = user
|
|
request.ctx.user_key = known_key
|
|
|
|
request.ctx.user_uploader_wrapper = Wrapped_CBotChat(request.app.ctx.memory._telegram_bot, chat_id=user.telegram_id, db_session=request.ctx.db_session)
|
|
request.ctx.user_client_wrapper = Wrapped_CBotChat(request.app.ctx.memory._client_telegram_bot, chat_id=user.telegram_id, db_session=request.ctx.db_session)
|
|
|
|
|
|
async def try_service_authorization(request):
|
|
signature = request.headers.get('X-Service-Signature')
|
|
if not signature:
|
|
return
|
|
|
|
message_hash_b58 = request.headers.get('X-Message-Hash')
|
|
if not message_hash_b58:
|
|
return
|
|
|
|
message_hash = b58decode(message_hash_b58)
|
|
signer = Signer(hot_seed)
|
|
if signer.verify(message_hash, signature):
|
|
request.ctx.verified_hash = message_hash
|
|
|
|
|
|
async def save_activity(request):
|
|
activity_meta = {}
|
|
try:
|
|
activity_meta["path"] = request.path
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
activity_meta["args"] = dict(request.args)
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
activity_meta["json"] = dict(request.json)
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
activity_meta["method"] = request.method
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
activity_meta["ip"] = (request.headers['X-Forwarded-for'] if 'X-Forwarded-for' in request.headers else None) \
|
|
or request.remote_addr or request.ip
|
|
activity_meta["ip"] = activity_meta["ip"].split(",")[0].strip()
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
activity_meta["headers"] = dict(request.headers)
|
|
except:
|
|
pass
|
|
|
|
new_user_activity = UserActivity(
|
|
type="API_V1_REQUEST",
|
|
meta=activity_meta,
|
|
user_id=request.ctx.user.id if request.ctx.user else None,
|
|
user_ip=activity_meta.get("ip", "0.0.0.0"),
|
|
created=datetime.now()
|
|
)
|
|
request.ctx.db_session.add(new_user_activity)
|
|
request.ctx.db_session.commit()
|
|
|
|
|
|
async def attach_user_to_request(request):
|
|
if request.method == 'OPTIONS':
|
|
return attach_headers(sanic_response.text("OK"))
|
|
|
|
request.ctx.db_session = Session()
|
|
request.ctx.verified_hash = None
|
|
request.ctx.user = None
|
|
request.ctx.user_key = None
|
|
request.ctx.user_uploader_wrapper = Wrapped_CBotChat(request.app.ctx.memory._telegram_bot, db_session=request.ctx.db_session)
|
|
request.ctx.user_client_wrapper = Wrapped_CBotChat(request.app.ctx.memory._client_telegram_bot, db_session=request.ctx.db_session)
|
|
await try_authorization(request)
|
|
await save_activity(request)
|
|
await try_service_authorization(request)
|
|
|
|
|
|
async def close_request_handler(request, response):
|
|
if request.method == 'OPTIONS':
|
|
response = sanic_response.text("OK")
|
|
|
|
try:
|
|
request.ctx.db_session.close()
|
|
except BaseException as e:
|
|
pass
|
|
|
|
return request, response
|
|
|
|
|
|
async def close_db_session(request, response):
|
|
request, response = await close_request_handler(request, response)
|
|
response = attach_headers(response)
|
|
return response
|