from __future__ import annotations import os import platform as py_platform import shutil from collections import defaultdict from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional from base58 import b58encode from sanic import response from sqlalchemy import func, select from app.api.routes._system import get_git_info from app.core._blockchain.ton.platform import platform from app.core._config import ( BACKEND_DATA_DIR_HOST, BACKEND_LOGS_DIR_HOST, LOG_DIR, PROJECT_HOST, UPLOADS_DIR, ) from app.core._secrets import hot_pubkey, service_wallet from app.core.ipfs_client import bitswap_stat, id_info, repo_stat from app.core.logger import make_log from app.core.models._config import ServiceConfig, ServiceConfigValue from app.core.models.content_v3 import ( ContentDerivative, ContentIndexItem, EncryptedContent, IpfsSync, UploadSession, ) from app.core.models.my_network import KnownNode from app.core.models.tasks import BlockchainTask ADMIN_COOKIE_NAME = os.getenv('ADMIN_COOKIE_NAME', 'admin_session') ADMIN_COOKIE_MAX_AGE = int(os.getenv('ADMIN_COOKIE_MAX_AGE', '172800')) # 48h default ADMIN_COOKIE_SAMESITE = os.getenv('ADMIN_COOKIE_SAMESITE', 'Lax') ADMIN_COOKIE_SECURE_MODE = os.getenv('ADMIN_COOKIE_SECURE', 'auto').lower() ADMIN_HEADER_NAME = os.getenv('ADMIN_HEADER_NAME', 'X-Admin-Token') def _cookie_secure_flag(request) -> bool: if ADMIN_COOKIE_SECURE_MODE == 'true': return True if ADMIN_COOKIE_SECURE_MODE == 'false': return False # auto mode: follow request scheme return getattr(request, 'scheme', 'http') == 'https' def _set_admin_cookie(resp, request, value: str, max_age: Optional[int] = None): resp.cookies[ADMIN_COOKIE_NAME] = value cookie = resp.cookies[ADMIN_COOKIE_NAME] cookie['path'] = '/' cookie['httponly'] = True cookie['samesite'] = ADMIN_COOKIE_SAMESITE cookie['secure'] = _cookie_secure_flag(request) if max_age is not None: cookie['max-age'] = max_age def _clear_admin_cookie(resp, request): _set_admin_cookie(resp, request, '', max_age=0) def _get_admin_header(request) -> Optional[str]: target = ADMIN_HEADER_NAME.lower() for key, value in request.headers.items(): if key.lower() == target: return value return None def _auth_ok(request) -> bool: token = os.getenv('ADMIN_API_TOKEN') if not token: return False cookie_value = request.cookies.get(ADMIN_COOKIE_NAME) if cookie_value == token: return True header_value = _get_admin_header(request) if not header_value: return False if header_value.startswith('Bearer '): header_value = header_value.split(' ', 1)[1].strip() return header_value == token def _unauthorized(): return response.json({"error": "UNAUTHORIZED"}, status=401) def _ensure_admin(request): if not _auth_ok(request): return _unauthorized() return None def _dir_stats(label: str, path: str) -> Dict[str, Any]: target = Path(path) exists = target.exists() size = 0 files = 0 if exists: if target.is_file(): try: stat = target.stat() size = stat.st_size files = 1 except OSError: pass else: for child in target.rglob('*'): try: if child.is_file(): files += 1 size += child.stat().st_size except OSError: continue return { 'label': label, 'path': str(target), 'exists': exists, 'file_count': files, 'size_bytes': size, } def _service_states(request) -> List[Dict[str, Any]]: now = datetime.utcnow() items: List[Dict[str, Any]] = [] memory = getattr(request.app.ctx, 'memory', None) known_states = getattr(memory, 'known_states', {}) if memory else {} if isinstance(known_states, dict): for name, payload in known_states.items(): ts: Optional[datetime] = payload.get('timestamp') if isinstance(payload, dict) else None delay = (now - ts).total_seconds() if ts else None healthy = delay is not None and delay < 120 items.append({ 'name': name, 'status': payload.get('status') if healthy else 'not working: timeout', 'last_reported_seconds': delay, }) items.sort(key=lambda item: item['name']) return items async def s_api_v1_admin_login(request): token = os.getenv('ADMIN_API_TOKEN') if not token: make_log('Admin', 'ADMIN_API_TOKEN is not configured', level='error') return response.json({"error": "ADMIN_TOKEN_NOT_CONFIGURED"}, status=500) payload = request.json or {} provided = (payload.get('secret') or '').strip() if provided != token: resp = response.json({"error": "UNAUTHORIZED"}, status=401) _clear_admin_cookie(resp, request) return resp resp = response.json({ "ok": True, "cookie_name": ADMIN_COOKIE_NAME, "header_name": ADMIN_HEADER_NAME, "max_age": ADMIN_COOKIE_MAX_AGE, }) _set_admin_cookie(resp, request, token, ADMIN_COOKIE_MAX_AGE) return resp async def s_api_v1_admin_logout(request): resp = response.json({"ok": True}) _clear_admin_cookie(resp, request) return resp async def s_api_v1_admin_overview(request): if (unauth := _ensure_admin(request)): return unauth session = request.ctx.db_session branch, commit = get_git_info() try: ipfs_identity = await id_info() except Exception as exc: # pragma: no cover - network failure path ipfs_identity = {"error": str(exc)} try: bitswap = await bitswap_stat() except Exception as exc: # pragma: no cover - network failure path bitswap = {"error": str(exc)} try: repo = await repo_stat() except Exception as exc: # pragma: no cover - network failure path repo = {"error": str(exc)} # Database counters encrypted_total = (await session.execute(select(func.count()).select_from(EncryptedContent))).scalar_one() upload_total = (await session.execute(select(func.count()).select_from(UploadSession))).scalar_one() derivative_ready = (await session.execute( select(func.count()).select_from(ContentDerivative).where(ContentDerivative.status == 'ready') )).scalar_one() node_id = b58encode(hot_pubkey).decode() overview_payload = { 'project': { 'host': PROJECT_HOST, 'name': os.getenv('PROJECT_NAME', 'unknown'), 'privacy': os.getenv('NODE_PRIVACY', 'public'), }, 'codebase': { 'branch': branch, 'commit': commit, }, 'node': { 'id': node_id, 'service_wallet': service_wallet.address.to_string(1, 1, 1), 'ton_master': platform.address.to_string(1, 1, 1), }, 'runtime': { 'python': py_platform.python_version(), 'implementation': py_platform.python_implementation(), 'platform': py_platform.platform(), 'utc_now': datetime.utcnow().isoformat() + 'Z', }, 'ipfs': { 'identity': ipfs_identity, 'bitswap': bitswap, 'repo': repo, }, 'content': { 'encrypted_total': int(encrypted_total or 0), 'upload_sessions_total': int(upload_total or 0), 'derivatives_ready': int(derivative_ready or 0), }, 'ton': { 'host': os.getenv('TONCENTER_HOST'), 'api_key_configured': bool(os.getenv('TONCENTER_API_KEY')), 'testnet': bool(int(os.getenv('TESTNET', '0'))), }, 'services': _service_states(request), } return response.json(overview_payload) async def s_api_v1_admin_storage(request): if (unauth := _ensure_admin(request)): return unauth session = request.ctx.db_session directories: List[Dict[str, Any]] = [] directories.append(_dir_stats('Encrypted uploads', UPLOADS_DIR)) directories.append(_dir_stats('Backend logs', LOG_DIR)) extra_dirs = { 'Host content mount': BACKEND_DATA_DIR_HOST, 'Host logs mount': BACKEND_LOGS_DIR_HOST, 'Tus staging': os.getenv('TUSD_DATA_DIR_HOST', ''), } for label, path in extra_dirs.items(): if path: directories.append(_dir_stats(label, path)) disk_snapshot: Optional[Dict[str, Any]] = None for entry in directories: if entry['exists']: try: usage = shutil.disk_usage(entry['path']) except Exception: continue disk_snapshot = { 'path': entry['path'], 'total_bytes': usage.total, 'used_bytes': usage.total - usage.free, 'free_bytes': usage.free, 'percent_used': round((usage.total - usage.free) / usage.total * 100, 2) if usage.total else None, } break derivatives = (await session.execute(select(ContentDerivative))).scalars().all() derivative_stats = { 'ready': sum(1 for d in derivatives if d.status == 'ready'), 'processing': sum(1 for d in derivatives if d.status == 'processing'), 'pending': sum(1 for d in derivatives if d.status == 'pending'), 'failed': sum(1 for d in derivatives if d.status == 'failed'), 'total_bytes': sum(int(d.size_bytes or 0) for d in derivatives if d.size_bytes), } storage_payload = { 'directories': directories, 'disk': disk_snapshot, 'derivatives': derivative_stats, } return response.json(storage_payload) async def s_api_v1_admin_uploads(request): if (unauth := _ensure_admin(request)): return unauth session = request.ctx.db_session counts_rows = (await session.execute( select(UploadSession.state, func.count()).group_by(UploadSession.state) )).all() counts = {state: int(count) for state, count in counts_rows} total = sum(counts.values()) recent_rows = (await session.execute( select(UploadSession).order_by(UploadSession.updated_at.desc()).limit(25) )).scalars().all() recent = [ { 'id': row.id, 'filename': row.filename, 'size_bytes': row.size_bytes, 'state': row.state, 'encrypted_cid': row.encrypted_cid, 'error': row.error, 'updated_at': row.updated_at.isoformat() + 'Z', 'created_at': row.created_at.isoformat() + 'Z', } for row in recent_rows ] payload = { 'total': total, 'states': counts, 'recent': recent, } return response.json(payload) async def s_api_v1_admin_system(request): if (unauth := _ensure_admin(request)): return unauth session = request.ctx.db_session config_rows = (await session.execute(select(ServiceConfigValue).order_by(ServiceConfigValue.key))).scalars().all() config_payload = [ {'key': row.key, 'value': row.value, 'raw': row.packed_value} for row in config_rows ] env_summary = { 'PROJECT_NAME': os.getenv('PROJECT_NAME'), 'PROJECT_HOST': PROJECT_HOST, 'NODE_PRIVACY': os.getenv('NODE_PRIVACY'), 'SANIC_PORT': os.getenv('SANIC_PORT'), 'LOG_LEVEL': os.getenv('LOG_LEVEL'), 'TESTNET': os.getenv('TESTNET'), } blockchain_counts_rows = (await session.execute( select(BlockchainTask.status, func.count()).group_by(BlockchainTask.status) )).all() blockchain_counts = {status: int(count) for status, count in blockchain_counts_rows} latest_index = (await session.execute( select(ContentIndexItem).order_by(ContentIndexItem.updated_at.desc()).limit(5) )).scalars().all() index_entries = [ { 'encrypted_cid': item.encrypted_cid, 'updated_at': item.updated_at.isoformat() + 'Z', } for item in latest_index ] payload = { 'env': env_summary, 'service_config': config_payload, 'services': _service_states(request), 'blockchain_tasks': blockchain_counts, 'latest_index_items': index_entries, } return response.json(payload) async def s_api_v1_admin_blockchain(request): if (unauth := _ensure_admin(request)): return unauth session = request.ctx.db_session counts_rows = (await session.execute( select(BlockchainTask.status, func.count()).group_by(BlockchainTask.status) )).all() counts = {status: int(count) for status, count in counts_rows} recent_rows = (await session.execute( select(BlockchainTask).order_by(BlockchainTask.updated.desc()).limit(20) )).scalars().all() recent = [ { 'id': task.id, 'destination': task.destination, 'amount': task.amount, 'status': task.status, 'epoch': task.epoch, 'seqno': task.seqno, 'transaction_hash': task.transaction_hash, 'updated': task.updated.isoformat() + 'Z', } for task in recent_rows ] payload = { 'counts': counts, 'recent': recent, } return response.json(payload) async def s_api_v1_admin_node_setrole(request): if (unauth := _ensure_admin(request)): return unauth data = request.json or {} role = (data.get('role') or '').strip() if role not in ('trusted', 'read-only', 'deny'): return response.json({"error": "BAD_ROLE"}, status=400) pub = (data.get('public_key') or '').strip() host = (data.get('host') or '').strip() if not pub and not host: return response.json({"error": "MISSING_TARGET"}, status=400) session = request.ctx.db_session row = None if pub: row = (await session.execute(select(KnownNode).where(KnownNode.public_key == pub))).scalars().first() if not row and host: row = (await session.execute(select(KnownNode).where(KnownNode.ip == host))).scalars().first() if not row: return response.json({"error": "NOT_FOUND"}, status=404) meta = row.meta or {} meta['role'] = role row.meta = meta await session.commit() return response.json({"ok": True, "node": {"ip": row.ip, "public_key": row.public_key, "role": role}}) async def s_api_v1_admin_nodes(request): if (unauth := _ensure_admin(request)): return unauth session = request.ctx.db_session rows = (await session.execute(select(KnownNode))).scalars().all() items = [] for r in rows: meta = r.meta or {} items.append({ "ip": r.ip, "port": r.port, "public_key": r.public_key, "role": meta.get('role') or 'read-only', "version": meta.get('version'), "last_seen": (r.last_sync.isoformat() + 'Z') if r.last_sync else None, "notes": meta.get('notes'), }) return response.json({"items": items}) async def s_api_v1_admin_status(request): if (unauth := _ensure_admin(request)): return unauth session = request.ctx.db_session pin_counts: Dict[str, int] = defaultdict(int) rows = (await session.execute(select(IpfsSync))).scalars().all() for r in rows: pin_counts[r.pin_state] += 1 deriv = (await session.execute(select(ContentDerivative))).scalars().all() deriv_counts = { 'ready': sum(1 for d in deriv if d.status == 'ready'), 'processing': sum(1 for d in deriv if d.status == 'processing'), 'pending': sum(1 for d in deriv if d.status == 'pending'), 'failed': sum(1 for d in deriv if d.status == 'failed'), } total_deriv_bytes = sum(int(d.size_bytes or 0) for d in deriv) ec = (await session.execute(select(EncryptedContent))).scalars().all() backlog = 0 for e in ec: if not e.preview_enabled: continue kinds = [d.kind for d in deriv if d.content_id == e.id and d.status == 'ready'] req = {'decrypted_low', 'decrypted_high', 'decrypted_preview'} if not req.issubset(set(kinds)): backlog += 1 try: bs = await bitswap_stat() except Exception: bs = {} try: rs = await repo_stat() except Exception: rs = {} cfg = ServiceConfig(session) max_gb = await cfg.get('DERIVATIVE_CACHE_MAX_GB', os.getenv('DERIVATIVE_CACHE_MAX_GB', '50')) ttl_days = await cfg.get('DERIVATIVE_CACHE_TTL_DAYS', os.getenv('DERIVATIVE_CACHE_TTL_DAYS', '0')) max_pins = await cfg.get('SYNC_MAX_CONCURRENT_PINS', os.getenv('SYNC_MAX_CONCURRENT_PINS', '4')) disk_pct = await cfg.get('SYNC_DISK_LOW_WATERMARK_PCT', os.getenv('SYNC_DISK_LOW_WATERMARK_PCT', '90')) return response.json({ 'ipfs': {'bitswap': bs, 'repo': rs}, 'pin_counts': dict(pin_counts), 'derivatives': {**deriv_counts, 'total_bytes': total_deriv_bytes}, 'convert_backlog': backlog, 'limits': { 'DERIVATIVE_CACHE_MAX_GB': float(max_gb), 'DERIVATIVE_CACHE_TTL_DAYS': int(ttl_days), 'SYNC_MAX_CONCURRENT_PINS': int(max_pins), 'SYNC_DISK_LOW_WATERMARK_PCT': int(disk_pct), } }) async def s_api_v1_admin_cache_setlimits(request): if (unauth := _ensure_admin(request)): return unauth data = request.json or {} max_gb = float(data.get('max_gb')) ttl_days = int(data.get('ttl_days')) cfg = ServiceConfig(request.ctx.db_session) await cfg.set('DERIVATIVE_CACHE_MAX_GB', max_gb) await cfg.set('DERIVATIVE_CACHE_TTL_DAYS', ttl_days) return response.json({"ok": True}) async def s_api_v1_admin_cache_cleanup(request): if (unauth := _ensure_admin(request)): return unauth data = request.json or {} mode = (data.get('mode') or 'fit') removed = 0 from datetime import timedelta session = request.ctx.db_session if mode == 'ttl': cfg = ServiceConfig(session) ttl = int(await cfg.get('DERIVATIVE_CACHE_TTL_DAYS', os.getenv('DERIVATIVE_CACHE_TTL_DAYS', '0'))) if ttl > 0: now = datetime.utcnow() rows = (await session.execute(select(ContentDerivative).where(ContentDerivative.status == 'ready'))).scalars().all() for r in rows: la = r.last_access_at or r.created_at if la and (now - la) > timedelta(days=ttl): try: if r.local_path and os.path.exists(r.local_path): os.remove(r.local_path) except Exception: pass r.status = 'pending' r.local_path = None r.size_bytes = None r.last_access_at = None removed += 1 await session.commit() else: target_gb = float(data.get('max_gb') or 0) if target_gb <= 0: return response.json({"error": "BAD_MAX_GB"}, status=400) limit_bytes = int(target_gb * (1024 ** 3)) rows = (await session.execute(select(ContentDerivative).where(ContentDerivative.status == 'ready'))).scalars().all() rows.sort(key=lambda r: (r.last_access_at or r.created_at or datetime.utcfromtimestamp(0))) total = sum(int(r.size_bytes or 0) for r in rows) for r in rows: if total <= limit_bytes: break try: if r.local_path and os.path.exists(r.local_path): os.remove(r.local_path) except Exception: pass total -= int(r.size_bytes or 0) r.status = 'pending' r.local_path = None r.size_bytes = None r.last_access_at = None removed += 1 await session.commit() return response.json({"ok": True, "removed": removed}) async def s_api_v1_admin_sync_setlimits(request): if (unauth := _ensure_admin(request)): return unauth data = request.json or {} max_pins = int(data.get('max_concurrent_pins')) disk_pct = int(data.get('disk_low_watermark_pct')) cfg = ServiceConfig(request.ctx.db_session) await cfg.set('SYNC_MAX_CONCURRENT_PINS', max_pins) await cfg.set('SYNC_DISK_LOW_WATERMARK_PCT', disk_pct) return response.json({"ok": True})