43 lines
1.4 KiB
Python
43 lines
1.4 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
from sanic import response
|
|
from sqlalchemy import select
|
|
|
|
from app.core.models.my_network import KnownNode
|
|
|
|
|
|
def _auth_ok(request) -> bool:
|
|
token = os.getenv('ADMIN_API_TOKEN')
|
|
if not token:
|
|
return False
|
|
auth = request.headers.get('Authorization', '')
|
|
return auth.strip() == f"Bearer {token}"
|
|
|
|
|
|
async def s_api_v1_admin_node_setrole(request):
|
|
if not _auth_ok(request):
|
|
return response.json({"error": "UNAUTHORIZED"}, status=401)
|
|
data = request.json or {}
|
|
role = (data.get('role') or '').strip()
|
|
if role not in ('trusted', 'read-only', 'deny'):
|
|
return response.json({"error": "BAD_ROLE"}, status=400)
|
|
pub = (data.get('public_key') or '').strip()
|
|
host = (data.get('host') or '').strip()
|
|
if not pub and not host:
|
|
return response.json({"error": "MISSING_TARGET"}, status=400)
|
|
session = request.ctx.db_session
|
|
row = None
|
|
if pub:
|
|
row = (await session.execute(select(KnownNode).where(KnownNode.public_key == pub))).scalars().first()
|
|
if not row and host:
|
|
row = (await session.execute(select(KnownNode).where(KnownNode.ip == host))).scalars().first()
|
|
if not row:
|
|
return response.json({"error": "NOT_FOUND"}, status=404)
|
|
meta = row.meta or {}
|
|
meta['role'] = role
|
|
row.meta = meta
|
|
await session.commit()
|
|
return response.json({"ok": True, "node": {"ip": row.ip, "public_key": row.public_key, "role": role}})
|
|
|