uploader-bot/app/api/routes/auth.py

197 lines
7.5 KiB
Python

from datetime import datetime
from uuid import uuid4
from aiogram.utils.web_app import safe_parse_webapp_init_data
from sanic import response
from sqlalchemy import select, and_
from tonsdk.utils import Address
from app.core._config import TELEGRAM_API_KEY, CLIENT_TELEGRAM_API_KEY
from app.core.logger import make_log
from app.core.models import KnownKey, WalletConnection
from app.core.models.user import User
from pytonconnect.parsers import WalletInfo, Account, TonProof
async def s_api_v1_auth_twa(request):
auth_data = {}
for req_key in ['twa_data', 'ton_proof', 'ref_id']:
try:
auth_data[req_key] = request.json[req_key]
except:
auth_data[req_key] = None
twa_data = auth_data['twa_data']
valid_twa_data = False
for validation_api_key in [TELEGRAM_API_KEY, CLIENT_TELEGRAM_API_KEY]:
try:
twa_data = safe_parse_webapp_init_data(token=validation_api_key, init_data=twa_data)
assert twa_data
valid_twa_data = True
break
except:
pass
if not valid_twa_data:
make_log("auth", "Invalid TWA data", level="warning")
return response.json({"error": "Invalid TWA data"}, status=401)
known_user = (await request.ctx.db_session.execute(
select(User).where(User.telegram_id == twa_data.user.id)
)).scalars().first()
if not known_user:
new_user = User(
telegram_id=twa_data.user.id,
username=twa_data.user.username,
meta={
"first_name": twa_data.user.first_name,
"last_name": twa_data.user.last_name,
"photo_url": twa_data.user.photo_url
},
lang_code=twa_data.user.language_code,
last_use=datetime.now(),
created=datetime.now()
)
request.ctx.db_session.add(new_user)
await request.ctx.db_session.commit()
known_user = (await request.ctx.db_session.execute(
select(User).where(User.telegram_id == twa_data.user.id)
)).scalars().first()
assert known_user, "User not created"
new_user_key = await known_user.create_api_token_v1(request.ctx.db_session, "USER_API_V1")
if auth_data['ton_proof']:
try:
wallet_info = WalletInfo()
auth_data['ton_proof']['account']['network'] = auth_data['ton_proof']['account']['chain']
wallet_info.account = Account.from_dict(auth_data['ton_proof']['account'])
wallet_info.ton_proof = TonProof.from_dict({'proof': auth_data['ton_proof']['ton_proof']})
connection_payload = auth_data['ton_proof']['ton_proof']['payload']
known_payload = (await request.ctx.db_session.execute(select(KnownKey).where(KnownKey.seed == connection_payload))).scalars().first()
assert known_payload, "Unknown payload"
assert known_payload.meta['I_user_id'] == known_user.id, "Invalid user_id"
assert wallet_info.check_proof(connection_payload), "Invalid proof"
for known_connection in (await request.ctx.db_session.execute(select(WalletConnection).where(
and_(
WalletConnection.user_id == known_user.id,
WalletConnection.network == 'ton'
)
))).scalars().all():
known_connection.invalidated = True
for other_connection in (await request.ctx.db_session.execute(select(WalletConnection).where(
WalletConnection.wallet_address == Address(wallet_info.account.address).to_string(1, 1, 1)
))).scalars().all():
other_connection.invalidated = True
new_connection = WalletConnection(
user_id=known_user.id,
network='ton',
wallet_key='web2-client==1',
connection_id=connection_payload,
wallet_address=Address(wallet_info.account.address).to_string(1, 1, 1),
keys={
'ton_proof': auth_data['ton_proof']
},
meta={},
created=datetime.now(),
updated=datetime.now(),
invalidated=False,
without_pk=False
)
request.ctx.db_session.add(new_connection)
await request.ctx.db_session.commit()
except BaseException as e:
make_log("auth", f"Invalid ton_proof: {e}", level="warning")
return response.json({"error": "Invalid ton_proof"}, status=400)
ton_connection = (await request.ctx.db_session.execute(select(WalletConnection).where(
and_(
WalletConnection.user_id == known_user.id,
WalletConnection.network == 'ton',
WalletConnection.invalidated == False
)
).order_by(WalletConnection.created.desc()))).scalars().first()
known_user.last_use = datetime.now()
await request.ctx.db_session.commit()
return response.json({
'user': known_user.json_format(),
'connected_wallet': ton_connection.json_format() if ton_connection else None,
'auth_v1_token': new_user_key['auth_v1_token']
})
async def s_api_v1_auth_me(request):
if not request.ctx.user:
return response.json({"error": "Unauthorized"}, status=401)
ton_connection = (await request.ctx.db_session.execute(
select(WalletConnection).where(
and_(
WalletConnection.user_id == request.ctx.user.id,
WalletConnection.network == 'ton',
WalletConnection.invalidated == False
)
).order_by(WalletConnection.created.desc())
)).scalars().first()
return response.json({
'user': request.ctx.user.json_format(),
'connected_wallet': ton_connection.json_format() if ton_connection else None
})
async def s_api_v1_auth_select_wallet(request):
if not request.ctx.user:
return response.json({"error": "Unauthorized"}, status=401)
try:
data = request.json
except Exception as e:
return response.json({"error": "Invalid JSON"}, status=400)
if "wallet_address" not in data:
return response.json({"error": "wallet_address is required"}, status=400)
# Convert raw wallet address to canonical format using Address from tonsdk.utils
raw_addr = data["wallet_address"]
canonical_address = Address(raw_addr).to_string(1, 1, 1)
db_session = request.ctx.db_session
user = request.ctx.user
# Check if a WalletConnection already exists for this user with the given canonical wallet address
existing_connection = (await db_session.execute(select(WalletConnection).where(
and_(
WalletConnection.user_id == user.id,
WalletConnection.wallet_address == canonical_address
)
))).scalars().first()
if not existing_connection:
return response.json({"error": "Wallet connection not found"}, status=404)
saved_values = {
'keys': existing_connection.keys,
'meta': existing_connection.meta,
'wallet_key': existing_connection.wallet_key,
'connection_id': existing_connection.connection_id + uuid4().hex,
'network': existing_connection.network,
}
new_connection = WalletConnection(
**saved_values,
user_id=user.id,
wallet_address=canonical_address,
created=datetime.now(),
updated=datetime.now(),
invalidated=False,
without_pk=False
)
db_session.add(new_connection)
await db_session.commit()
return response.empty(status=200)