from __future__ import annotations import os from sanic import response from sqlalchemy import select from app.core.models.my_network import KnownNode from app.core.models.content_v3 import EncryptedContent, IpfsSync, ContentDerivative from app.core.models._config import ServiceConfig from app.core.ipfs_client import bitswap_stat, repo_stat def _auth_ok(request) -> bool: token = os.getenv('ADMIN_API_TOKEN') if not token: return False auth = request.headers.get('Authorization', '') return auth.strip() == f"Bearer {token}" async def s_api_v1_admin_node_setrole(request): if not _auth_ok(request): return response.json({"error": "UNAUTHORIZED"}, status=401) data = request.json or {} role = (data.get('role') or '').strip() if role not in ('trusted', 'read-only', 'deny'): return response.json({"error": "BAD_ROLE"}, status=400) pub = (data.get('public_key') or '').strip() host = (data.get('host') or '').strip() if not pub and not host: return response.json({"error": "MISSING_TARGET"}, status=400) session = request.ctx.db_session row = None if pub: row = (await session.execute(select(KnownNode).where(KnownNode.public_key == pub))).scalars().first() if not row and host: row = (await session.execute(select(KnownNode).where(KnownNode.ip == host))).scalars().first() if not row: return response.json({"error": "NOT_FOUND"}, status=404) meta = row.meta or {} meta['role'] = role row.meta = meta await session.commit() return response.json({"ok": True, "node": {"ip": row.ip, "public_key": row.public_key, "role": role}}) async def s_api_v1_admin_nodes(request): if not _auth_ok(request): return response.json({"error": "UNAUTHORIZED"}, status=401) session = request.ctx.db_session rows = (await session.execute(select(KnownNode))).scalars().all() items = [] for r in rows: meta = r.meta or {} items.append({ "ip": r.ip, "port": r.port, "public_key": r.public_key, "role": meta.get('role') or 'read-only', "version": meta.get('version'), "last_seen": (r.last_sync.isoformat() + 'Z') if r.last_sync else None, }) return response.json({"items": items}) async def s_api_v1_admin_status(request): if not _auth_ok(request): return response.json({"error": "UNAUTHORIZED"}, status=401) session = request.ctx.db_session # DB metrics pin_counts = {k: 0 for k in ('not_pinned','queued','pinning','pinned','failed')} rows = (await session.execute(select(IpfsSync))).scalars().all() for r in rows: pin_counts[r.pin_state] = pin_counts.get(r.pin_state, 0) + 1 deriv = (await session.execute(select(ContentDerivative))).scalars().all() deriv_counts = { 'ready': sum(1 for d in deriv if d.status=='ready'), 'processing': sum(1 for d in deriv if d.status=='processing'), 'pending': sum(1 for d in deriv if d.status=='pending'), 'failed': sum(1 for d in deriv if d.status=='failed'), } total_deriv_bytes = sum(int(d.size_bytes or 0) for d in deriv) # Backlog: number of EC needing conversion ec = (await session.execute(select(EncryptedContent))).scalars().all() backlog = 0 for e in ec: if not e.preview_enabled: continue kinds = [d.kind for d in deriv if d.content_id==e.id and d.status=='ready'] req = {'decrypted_low','decrypted_high','decrypted_preview'} if not req.issubset(set(kinds)): backlog += 1 # IPFS metrics try: bs = await bitswap_stat() except Exception: bs = {} try: rs = await repo_stat() except Exception: rs = {} # Limits cfg = ServiceConfig(session) max_gb = await cfg.get('DERIVATIVE_CACHE_MAX_GB', os.getenv('DERIVATIVE_CACHE_MAX_GB','50')) ttl_days = await cfg.get('DERIVATIVE_CACHE_TTL_DAYS', os.getenv('DERIVATIVE_CACHE_TTL_DAYS','0')) max_pins = await cfg.get('SYNC_MAX_CONCURRENT_PINS', os.getenv('SYNC_MAX_CONCURRENT_PINS','4')) disk_pct = await cfg.get('SYNC_DISK_LOW_WATERMARK_PCT', os.getenv('SYNC_DISK_LOW_WATERMARK_PCT','90')) return response.json({ 'ipfs': {'bitswap': bs, 'repo': rs}, 'pin_counts': pin_counts, 'derivatives': {**deriv_counts, 'total_bytes': total_deriv_bytes}, 'convert_backlog': backlog, 'limits': { 'DERIVATIVE_CACHE_MAX_GB': float(max_gb), 'DERIVATIVE_CACHE_TTL_DAYS': int(ttl_days), 'SYNC_MAX_CONCURRENT_PINS': int(max_pins), 'SYNC_DISK_LOW_WATERMARK_PCT': int(disk_pct), } }) async def s_api_v1_admin_cache_setlimits(request): if not _auth_ok(request): return response.json({"error": "UNAUTHORIZED"}, status=401) data = request.json or {} max_gb = float(data.get('max_gb')) ttl_days = int(data.get('ttl_days')) cfg = ServiceConfig(request.ctx.db_session) await cfg.set('DERIVATIVE_CACHE_MAX_GB', max_gb) await cfg.set('DERIVATIVE_CACHE_TTL_DAYS', ttl_days) return response.json({"ok": True}) async def s_api_v1_admin_cache_cleanup(request): if not _auth_ok(request): return response.json({"error": "UNAUTHORIZED"}, status=401) data = request.json or {} mode = (data.get('mode') or 'fit') # reuse janitor logic inline removed = 0 from datetime import datetime, timedelta if mode == 'ttl': # Evict by TTL async with request.ctx.db_session as session: from app.core.models._config import ServiceConfig ttl = int(await ServiceConfig(session).get('DERIVATIVE_CACHE_TTL_DAYS', os.getenv('DERIVATIVE_CACHE_TTL_DAYS','0'))) if ttl > 0: now = datetime.utcnow() rows = (await request.ctx.db_session.execute(select(ContentDerivative).where(ContentDerivative.status=='ready'))).scalars().all() for r in rows: la = r.last_access_at or r.created_at if la and (now - la) > timedelta(days=ttl): try: if r.local_path and os.path.exists(r.local_path): os.remove(r.local_path) except Exception: pass r.status='pending'; r.local_path=None; r.size_bytes=None; r.last_access_at=None removed += 1 await request.ctx.db_session.commit() else: # Fit to size target_gb = float(data.get('max_gb') or 0) if target_gb <= 0: return response.json({"error": "BAD_MAX_GB"}, status=400) limit_bytes = int(target_gb * (1024**3)) rows = (await request.ctx.db_session.execute(select(ContentDerivative).where(ContentDerivative.status=='ready'))).scalars().all() rows.sort(key=lambda r: (r.last_access_at or r.created_at or datetime.utcfromtimestamp(0))) total = sum(int(r.size_bytes or 0) for r in rows) for r in rows: if total <= limit_bytes: break try: if r.local_path and os.path.exists(r.local_path): os.remove(r.local_path) except Exception: pass total -= int(r.size_bytes or 0) r.status='pending'; r.local_path=None; r.size_bytes=None; r.last_access_at=None removed += 1 await request.ctx.db_session.commit() return response.json({"ok": True, "removed": removed}) async def s_api_v1_admin_sync_setlimits(request): if not _auth_ok(request): return response.json({"error": "UNAUTHORIZED"}, status=401) data = request.json or {} max_pins = int(data.get('max_concurrent_pins')) disk_pct = int(data.get('disk_low_watermark_pct')) cfg = ServiceConfig(request.ctx.db_session) await cfg.set('SYNC_MAX_CONCURRENT_PINS', max_pins) await cfg.set('SYNC_DISK_LOW_WATERMARK_PCT', disk_pct) return response.json({"ok": True})