From 075a35b4415524e0c9b3459b583af3396cb3ba41 Mon Sep 17 00:00:00 2001 From: user Date: Sat, 13 Sep 2025 19:41:47 +0300 Subject: [PATCH] ipfs admin --- app/api/__init__.py | 14 +- app/api/routes/admin.py | 152 ++++++++++++++++++ .../background/derivative_cache_janitor.py | 14 +- app/core/background/index_scout_v3.py | 15 +- app/core/ipfs_client.py | 14 ++ 5 files changed, 200 insertions(+), 9 deletions(-) diff --git a/app/api/__init__.py b/app/api/__init__.py index 9c277e0..26750b6 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -32,7 +32,14 @@ from app.api.routes._blockchain import s_api_v1_blockchain_send_new_content_mess from app.api.routes.content import s_api_v1_content_list, s_api_v1_content_view, s_api_v1_content_friendly_list, s_api_v1_5_content_list from app.api.routes.content_index import s_api_v1_content_index, s_api_v1_content_delta from app.api.routes.derivatives import s_api_v1_content_derivatives -from app.api.routes.admin import s_api_v1_admin_node_setrole +from app.api.routes.admin import ( + s_api_v1_admin_node_setrole, + s_api_v1_admin_nodes, + s_api_v1_admin_status, + s_api_v1_admin_cache_setlimits, + s_api_v1_admin_cache_cleanup, + s_api_v1_admin_sync_setlimits, +) from app.api.routes.tonconnect import s_api_v1_tonconnect_new, s_api_v1_tonconnect_logout from app.api.routes.keys import s_api_v1_keys_request from app.api.routes.sync import s_api_v1_sync_pin, s_api_v1_sync_status @@ -80,6 +87,11 @@ app.add_route(s_api_v1_content_index, "/api/v1/content.index", methods=["GET", " app.add_route(s_api_v1_content_delta, "/api/v1/content.delta", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_content_derivatives, "/api/v1/content.derivatives", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_admin_node_setrole, "/api/v1/admin.node.setRole", methods=["POST", "OPTIONS"]) +app.add_route(s_api_v1_admin_nodes, "/api/v1/admin.nodes", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_admin_status, "/api/v1/admin.status", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_admin_cache_setlimits, "/api/v1/admin.cache.setLimits", methods=["POST", "OPTIONS"]) +app.add_route(s_api_v1_admin_cache_cleanup, "/api/v1/admin.cache.cleanup", methods=["POST", "OPTIONS"]) +app.add_route(s_api_v1_admin_sync_setlimits, "/api/v1/admin.sync.setLimits", methods=["POST", "OPTIONS"]) # tusd HTTP hooks app.add_route(s_api_v1_upload_tus_hook, "/api/v1/upload.tus-hook", methods=["POST", "OPTIONS"]) diff --git a/app/api/routes/admin.py b/app/api/routes/admin.py index 863046b..17a119c 100644 --- a/app/api/routes/admin.py +++ b/app/api/routes/admin.py @@ -5,6 +5,9 @@ from sanic import response from sqlalchemy import select from app.core.models.my_network import KnownNode +from app.core.models.content_v3 import EncryptedContent, IpfsSync, ContentDerivative +from app.core.models._config import ServiceConfig +from app.core.ipfs_client import bitswap_stat, repo_stat def _auth_ok(request) -> bool: @@ -40,3 +43,152 @@ async def s_api_v1_admin_node_setrole(request): await session.commit() return response.json({"ok": True, "node": {"ip": row.ip, "public_key": row.public_key, "role": role}}) + +async def s_api_v1_admin_nodes(request): + if not _auth_ok(request): + return response.json({"error": "UNAUTHORIZED"}, status=401) + session = request.ctx.db_session + rows = (await session.execute(select(KnownNode))).scalars().all() + items = [] + for r in rows: + meta = r.meta or {} + items.append({ + "ip": r.ip, + "port": r.port, + "public_key": r.public_key, + "role": meta.get('role') or 'read-only', + "version": meta.get('version'), + "last_seen": (r.last_sync.isoformat() + 'Z') if r.last_sync else None, + }) + return response.json({"items": items}) + + +async def s_api_v1_admin_status(request): + if not _auth_ok(request): + return response.json({"error": "UNAUTHORIZED"}, status=401) + session = request.ctx.db_session + # DB metrics + pin_counts = {k: 0 for k in ('not_pinned','queued','pinning','pinned','failed')} + rows = (await session.execute(select(IpfsSync))).scalars().all() + for r in rows: + pin_counts[r.pin_state] = pin_counts.get(r.pin_state, 0) + 1 + deriv = (await session.execute(select(ContentDerivative))).scalars().all() + deriv_counts = { + 'ready': sum(1 for d in deriv if d.status=='ready'), + 'processing': sum(1 for d in deriv if d.status=='processing'), + 'pending': sum(1 for d in deriv if d.status=='pending'), + 'failed': sum(1 for d in deriv if d.status=='failed'), + } + total_deriv_bytes = sum(int(d.size_bytes or 0) for d in deriv) + # Backlog: number of EC needing conversion + ec = (await session.execute(select(EncryptedContent))).scalars().all() + backlog = 0 + for e in ec: + if not e.preview_enabled: + continue + kinds = [d.kind for d in deriv if d.content_id==e.id and d.status=='ready'] + req = {'decrypted_low','decrypted_high','decrypted_preview'} + if not req.issubset(set(kinds)): + backlog += 1 + # IPFS metrics + try: + bs = await bitswap_stat() + except Exception: + bs = {} + try: + rs = await repo_stat() + except Exception: + rs = {} + # Limits + cfg = ServiceConfig(session) + max_gb = await cfg.get('DERIVATIVE_CACHE_MAX_GB', os.getenv('DERIVATIVE_CACHE_MAX_GB','50')) + ttl_days = await cfg.get('DERIVATIVE_CACHE_TTL_DAYS', os.getenv('DERIVATIVE_CACHE_TTL_DAYS','0')) + max_pins = await cfg.get('SYNC_MAX_CONCURRENT_PINS', os.getenv('SYNC_MAX_CONCURRENT_PINS','4')) + disk_pct = await cfg.get('SYNC_DISK_LOW_WATERMARK_PCT', os.getenv('SYNC_DISK_LOW_WATERMARK_PCT','90')) + return response.json({ + 'ipfs': {'bitswap': bs, 'repo': rs}, + 'pin_counts': pin_counts, + 'derivatives': {**deriv_counts, 'total_bytes': total_deriv_bytes}, + 'convert_backlog': backlog, + 'limits': { + 'DERIVATIVE_CACHE_MAX_GB': float(max_gb), + 'DERIVATIVE_CACHE_TTL_DAYS': int(ttl_days), + 'SYNC_MAX_CONCURRENT_PINS': int(max_pins), + 'SYNC_DISK_LOW_WATERMARK_PCT': int(disk_pct), + } + }) + + +async def s_api_v1_admin_cache_setlimits(request): + if not _auth_ok(request): + return response.json({"error": "UNAUTHORIZED"}, status=401) + data = request.json or {} + max_gb = float(data.get('max_gb')) + ttl_days = int(data.get('ttl_days')) + cfg = ServiceConfig(request.ctx.db_session) + await cfg.set('DERIVATIVE_CACHE_MAX_GB', max_gb) + await cfg.set('DERIVATIVE_CACHE_TTL_DAYS', ttl_days) + return response.json({"ok": True}) + + +async def s_api_v1_admin_cache_cleanup(request): + if not _auth_ok(request): + return response.json({"error": "UNAUTHORIZED"}, status=401) + data = request.json or {} + mode = (data.get('mode') or 'fit') + # reuse janitor logic inline + removed = 0 + from datetime import datetime, timedelta + if mode == 'ttl': + # Evict by TTL + async with request.ctx.db_session as session: + from app.core.models._config import ServiceConfig + ttl = int(await ServiceConfig(session).get('DERIVATIVE_CACHE_TTL_DAYS', os.getenv('DERIVATIVE_CACHE_TTL_DAYS','0'))) + if ttl > 0: + now = datetime.utcnow() + rows = (await request.ctx.db_session.execute(select(ContentDerivative).where(ContentDerivative.status=='ready'))).scalars().all() + for r in rows: + la = r.last_access_at or r.created_at + if la and (now - la) > timedelta(days=ttl): + try: + if r.local_path and os.path.exists(r.local_path): + os.remove(r.local_path) + except Exception: + pass + r.status='pending'; r.local_path=None; r.size_bytes=None; r.last_access_at=None + removed += 1 + await request.ctx.db_session.commit() + else: + # Fit to size + target_gb = float(data.get('max_gb') or 0) + if target_gb <= 0: + return response.json({"error": "BAD_MAX_GB"}, status=400) + limit_bytes = int(target_gb * (1024**3)) + rows = (await request.ctx.db_session.execute(select(ContentDerivative).where(ContentDerivative.status=='ready'))).scalars().all() + rows.sort(key=lambda r: (r.last_access_at or r.created_at or datetime.utcfromtimestamp(0))) + total = sum(int(r.size_bytes or 0) for r in rows) + for r in rows: + if total <= limit_bytes: + break + try: + if r.local_path and os.path.exists(r.local_path): + os.remove(r.local_path) + except Exception: + pass + total -= int(r.size_bytes or 0) + r.status='pending'; r.local_path=None; r.size_bytes=None; r.last_access_at=None + removed += 1 + await request.ctx.db_session.commit() + return response.json({"ok": True, "removed": removed}) + + +async def s_api_v1_admin_sync_setlimits(request): + if not _auth_ok(request): + return response.json({"error": "UNAUTHORIZED"}, status=401) + data = request.json or {} + max_pins = int(data.get('max_concurrent_pins')) + disk_pct = int(data.get('disk_low_watermark_pct')) + cfg = ServiceConfig(request.ctx.db_session) + await cfg.set('SYNC_MAX_CONCURRENT_PINS', max_pins) + await cfg.set('SYNC_DISK_LOW_WATERMARK_PCT', disk_pct) + return response.json({"ok": True}) diff --git a/app/core/background/derivative_cache_janitor.py b/app/core/background/derivative_cache_janitor.py index 70d0707..1898252 100644 --- a/app/core/background/derivative_cache_janitor.py +++ b/app/core/background/derivative_cache_janitor.py @@ -7,10 +7,11 @@ from sqlalchemy import select from app.core.logger import make_log from app.core.storage import db_session from app.core.models.content_v3 import ContentDerivative +from app.core.models._config import ServiceConfig -MAX_GB = float(os.getenv('DERIVATIVE_CACHE_MAX_GB', '50')) -TTL_DAYS = int(os.getenv('DERIVATIVE_CACHE_TTL_DAYS', '30')) +ENV_MAX_GB = float(os.getenv('DERIVATIVE_CACHE_MAX_GB', '50')) +ENV_TTL_DAYS = int(os.getenv('DERIVATIVE_CACHE_TTL_DAYS', '0')) INTERVAL_SEC = int(os.getenv('DERIVATIVE_JANITOR_INTERVAL_SEC', '600')) @@ -22,7 +23,10 @@ async def _current_total_size() -> int: async def _evict_over_ttl(now: datetime) -> int: removed = 0 - if TTL_DAYS <= 0: + # Pull TTL from ServiceConfig each time + async with db_session() as session: + ttl_days = await ServiceConfig(session).get('DERIVATIVE_CACHE_TTL_DAYS', ENV_TTL_DAYS) + if int(ttl_days) <= 0: return 0 async with db_session() as session: rows = (await session.execute(select(ContentDerivative).where(ContentDerivative.status == 'ready'))).scalars().all() @@ -44,7 +48,9 @@ async def _evict_over_ttl(now: datetime) -> int: async def _evict_to_fit(): - limit_bytes = int(MAX_GB * (1024 ** 3)) + async with db_session() as session: + max_gb = await ServiceConfig(session).get('DERIVATIVE_CACHE_MAX_GB', ENV_MAX_GB) + limit_bytes = int(float(max_gb) * (1024 ** 3)) total = await _current_total_size() if total <= limit_bytes: return 0 diff --git a/app/core/background/index_scout_v3.py b/app/core/background/index_scout_v3.py index fab1bd8..0c16026 100644 --- a/app/core/background/index_scout_v3.py +++ b/app/core/background/index_scout_v3.py @@ -14,8 +14,8 @@ from app.core.ipfs_client import pin_add, find_providers, swarm_connect INTERVAL_SEC = 60 -PIN_CONCURRENCY = int(os.getenv('SYNC_MAX_CONCURRENT_PINS', '4')) -DISK_WATERMARK_PCT = int(os.getenv('SYNC_DISK_LOW_WATERMARK_PCT', '90')) +ENV_PIN_CONCURRENCY = int(os.getenv('SYNC_MAX_CONCURRENT_PINS', '4')) +ENV_DISK_WATERMARK_PCT = int(os.getenv('SYNC_DISK_LOW_WATERMARK_PCT', '90')) async def fetch_index(base_url: str, etag: str | None, since: str | None) -> tuple[List[dict], str | None]: @@ -104,9 +104,16 @@ async def upsert_content(item: dict): async def main_fn(memory): make_log('index_scout_v3', 'Service started', level='info') - sem = asyncio.Semaphore(PIN_CONCURRENCY) + sem = None while True: try: + # Read runtime config from ServiceConfig (fallback to env) + from app.core.models._config import ServiceConfig + async with db_session() as session: + max_pins = int(await ServiceConfig(session).get('SYNC_MAX_CONCURRENT_PINS', ENV_PIN_CONCURRENCY)) + disk_pct = int(await ServiceConfig(session).get('SYNC_DISK_LOW_WATERMARK_PCT', ENV_DISK_WATERMARK_PCT)) + if sem is None or sem._value != max_pins: + sem = asyncio.Semaphore(max_pins) async with db_session() as session: nodes = (await session.execute(select(KnownNode))).scalars().all() for n in nodes: @@ -140,7 +147,7 @@ async def main_fn(memory): from app.core._config import UPLOADS_DIR du = shutil.disk_usage(UPLOADS_DIR) used_pct = int(100 * (1 - du.free / du.total)) - if used_pct >= DISK_WATERMARK_PCT: + if used_pct >= disk_pct: make_log('index_scout_v3', f"Disk watermark reached ({used_pct}%), skipping pins") continue except Exception: diff --git a/app/core/ipfs_client.py b/app/core/ipfs_client.py index d1b5183..11cd3ad 100644 --- a/app/core/ipfs_client.py +++ b/app/core/ipfs_client.py @@ -103,3 +103,17 @@ async def find_providers(cid: str, max_results: int = 8): if len(out) >= max_results: return out return out + + +async def bitswap_stat() -> Dict[str, Any]: + async with httpx.AsyncClient(timeout=10) as client: + r = await client.post(f"{IPFS_API_URL}/api/v0/bitswap/stat") + r.raise_for_status() + return r.json() + + +async def repo_stat() -> Dict[str, Any]: + async with httpx.AsyncClient(timeout=10) as client: + r = await client.post(f"{IPFS_API_URL}/api/v0/repo/stat") + r.raise_for_status() + return r.json()