diff --git a/app/api/__init__.py b/app/api/__init__.py index 26750b6..2a41ebb 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -33,12 +33,19 @@ from app.api.routes.content import s_api_v1_content_list, s_api_v1_content_view, from app.api.routes.content_index import s_api_v1_content_index, s_api_v1_content_delta from app.api.routes.derivatives import s_api_v1_content_derivatives from app.api.routes.admin import ( + s_api_v1_admin_blockchain, + s_api_v1_admin_cache_cleanup, + s_api_v1_admin_cache_setlimits, + s_api_v1_admin_login, + s_api_v1_admin_logout, s_api_v1_admin_node_setrole, s_api_v1_admin_nodes, + s_api_v1_admin_overview, s_api_v1_admin_status, - s_api_v1_admin_cache_setlimits, - s_api_v1_admin_cache_cleanup, + s_api_v1_admin_storage, s_api_v1_admin_sync_setlimits, + s_api_v1_admin_system, + s_api_v1_admin_uploads, ) from app.api.routes.tonconnect import s_api_v1_tonconnect_new, s_api_v1_tonconnect_logout from app.api.routes.keys import s_api_v1_keys_request @@ -86,6 +93,13 @@ app.add_route(s_api_v1_5_content_list, "/api/v1.5/content.list", methods=["GET", app.add_route(s_api_v1_content_index, "/api/v1/content.index", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_content_delta, "/api/v1/content.delta", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_content_derivatives, "/api/v1/content.derivatives", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_admin_login, "/api/v1/admin.login", methods=["POST", "OPTIONS"]) +app.add_route(s_api_v1_admin_logout, "/api/v1/admin.logout", methods=["POST", "OPTIONS"]) +app.add_route(s_api_v1_admin_overview, "/api/v1/admin.overview", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_admin_storage, "/api/v1/admin.storage", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_admin_uploads, "/api/v1/admin.uploads", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_admin_system, "/api/v1/admin.system", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_admin_blockchain, "/api/v1/admin.blockchain", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_admin_node_setrole, "/api/v1/admin.node.setRole", methods=["POST", "OPTIONS"]) app.add_route(s_api_v1_admin_nodes, "/api/v1/admin.nodes", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_admin_status, "/api/v1/admin.status", methods=["GET", "OPTIONS"]) @@ -156,10 +170,4 @@ async def s_handle_exception(request, exception): response_buffer = response.json(payload, status=status) response_buffer = await close_db_session(request, response_buffer) - response_buffer.headers["Access-Control-Allow-Origin"] = "*" - response_buffer.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" - response_buffer.headers["Access-Control-Allow-Headers"] = "Origin, Content-Type, Accept, Authorization, Referer, User-Agent, Sec-Fetch-Dest, Sec-Fetch-Mode, Sec-Fetch-Site, x-request-id" - response_buffer.headers["Access-Control-Allow-Credentials"] = "true" - response_buffer.headers["X-Session-Id"] = session_id - response_buffer.headers["X-Error-Id"] = error_id return response_buffer diff --git a/app/api/middleware.py b/app/api/middleware.py index 34cbbae..ef63785 100644 --- a/app/api/middleware.py +++ b/app/api/middleware.py @@ -18,19 +18,15 @@ from app.core.log_context import ( def attach_headers(response, request=None): + response.headers.pop("Access-Control-Allow-Origin", None) response.headers["Access-Control-Allow-Origin"] = "*" response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" response.headers["Access-Control-Allow-Headers"] = "Origin, Content-Type, Accept, Authorization, Referer, User-Agent, Sec-Fetch-Dest, Sec-Fetch-Mode, Sec-Fetch-Site, x-file-name, x-last-chunk, x-chunk-start, x-upload-id, x-request-id" - # response.headers["Access-Control-Allow-Credentials"] = "true" - try: - sid = getattr(request.ctx, 'session_id', None) if request else None - if sid: - response.headers["X-Session-Id"] = sid - except BaseException: - pass + response.headers.pop("Access-Control-Allow-Credentials", None) return response + async def try_authorization(request): token = request.headers.get("Authorization") if not token: @@ -200,6 +196,8 @@ async def close_request_handler(request, response): if request.method == 'OPTIONS': response = sanic_response.text("OK") + response = attach_headers(response, request) + try: await request.ctx.db_session.close() except BaseException: @@ -214,14 +212,11 @@ async def close_request_handler(request, response): except BaseException: pass - response = attach_headers(response, request) - return request, response async def close_db_session(request, response): request, response = await close_request_handler(request, response) - response = attach_headers(response, request) # Clear contextvars try: ctx_session_id.set(None) diff --git a/app/api/routes/admin.py b/app/api/routes/admin.py index 17a119c..fbbedc8 100644 --- a/app/api/routes/admin.py +++ b/app/api/routes/admin.py @@ -1,26 +1,411 @@ + from __future__ import annotations import os -from sanic import response -from sqlalchemy import select +import platform as py_platform +import shutil +from collections import defaultdict +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional +from base58 import b58encode +from sanic import response +from sqlalchemy import func, select + +from app.api.routes._system import get_git_info +from app.core._blockchain.ton.platform import platform +from app.core._config import ( + BACKEND_DATA_DIR_HOST, + BACKEND_LOGS_DIR_HOST, + LOG_DIR, + PROJECT_HOST, + UPLOADS_DIR, +) +from app.core._secrets import hot_pubkey, service_wallet +from app.core.ipfs_client import bitswap_stat, id_info, repo_stat +from app.core.logger import make_log +from app.core.models._config import ServiceConfig, ServiceConfigValue +from app.core.models.content_v3 import ( + ContentDerivative, + ContentIndexItem, + EncryptedContent, + IpfsSync, + UploadSession, +) from app.core.models.my_network import KnownNode -from app.core.models.content_v3 import EncryptedContent, IpfsSync, ContentDerivative -from app.core.models._config import ServiceConfig -from app.core.ipfs_client import bitswap_stat, repo_stat +from app.core.models.tasks import BlockchainTask + +ADMIN_COOKIE_NAME = os.getenv('ADMIN_COOKIE_NAME', 'admin_session') +ADMIN_COOKIE_MAX_AGE = int(os.getenv('ADMIN_COOKIE_MAX_AGE', '172800')) # 48h default +ADMIN_COOKIE_SAMESITE = os.getenv('ADMIN_COOKIE_SAMESITE', 'Lax') +ADMIN_COOKIE_SECURE_MODE = os.getenv('ADMIN_COOKIE_SECURE', 'auto').lower() + + +def _cookie_secure_flag(request) -> bool: + if ADMIN_COOKIE_SECURE_MODE == 'true': + return True + if ADMIN_COOKIE_SECURE_MODE == 'false': + return False + # auto mode: follow request scheme + return getattr(request, 'scheme', 'http') == 'https' + + +def _set_admin_cookie(resp, request, value: str, max_age: Optional[int] = None): + resp.cookies[ADMIN_COOKIE_NAME] = value + cookie = resp.cookies[ADMIN_COOKIE_NAME] + cookie['path'] = '/' + cookie['httponly'] = True + cookie['samesite'] = ADMIN_COOKIE_SAMESITE + cookie['secure'] = _cookie_secure_flag(request) + if max_age is not None: + cookie['max-age'] = max_age + + +def _clear_admin_cookie(resp, request): + _set_admin_cookie(resp, request, '', max_age=0) def _auth_ok(request) -> bool: token = os.getenv('ADMIN_API_TOKEN') if not token: return False - auth = request.headers.get('Authorization', '') - return auth.strip() == f"Bearer {token}" + cookie_value = request.cookies.get(ADMIN_COOKIE_NAME) + return cookie_value == token + + +def _unauthorized(): + return response.json({"error": "UNAUTHORIZED"}, status=401) + + +def _ensure_admin(request): + if not _auth_ok(request): + return _unauthorized() + return None + + +def _dir_stats(label: str, path: str) -> Dict[str, Any]: + target = Path(path) + exists = target.exists() + size = 0 + files = 0 + if exists: + if target.is_file(): + try: + stat = target.stat() + size = stat.st_size + files = 1 + except OSError: + pass + else: + for child in target.rglob('*'): + try: + if child.is_file(): + files += 1 + size += child.stat().st_size + except OSError: + continue + return { + 'label': label, + 'path': str(target), + 'exists': exists, + 'file_count': files, + 'size_bytes': size, + } + + +def _service_states(request) -> List[Dict[str, Any]]: + now = datetime.utcnow() + items: List[Dict[str, Any]] = [] + memory = getattr(request.app.ctx, 'memory', None) + known_states = getattr(memory, 'known_states', {}) if memory else {} + if isinstance(known_states, dict): + for name, payload in known_states.items(): + ts: Optional[datetime] = payload.get('timestamp') if isinstance(payload, dict) else None + delay = (now - ts).total_seconds() if ts else None + healthy = delay is not None and delay < 120 + items.append({ + 'name': name, + 'status': payload.get('status') if healthy else 'not working: timeout', + 'last_reported_seconds': delay, + }) + items.sort(key=lambda item: item['name']) + return items + + +async def s_api_v1_admin_login(request): + token = os.getenv('ADMIN_API_TOKEN') + if not token: + make_log('Admin', 'ADMIN_API_TOKEN is not configured', level='error') + return response.json({"error": "ADMIN_TOKEN_NOT_CONFIGURED"}, status=500) + + payload = request.json or {} + provided = (payload.get('secret') or '').strip() + if provided != token: + resp = response.json({"error": "UNAUTHORIZED"}, status=401) + _clear_admin_cookie(resp, request) + return resp + + resp = response.json({"ok": True}) + _set_admin_cookie(resp, request, token, ADMIN_COOKIE_MAX_AGE) + return resp + + +async def s_api_v1_admin_logout(request): + resp = response.json({"ok": True}) + _clear_admin_cookie(resp, request) + return resp + + +async def s_api_v1_admin_overview(request): + if (unauth := _ensure_admin(request)): + return unauth + + session = request.ctx.db_session + + branch, commit = get_git_info() + + try: + ipfs_identity = await id_info() + except Exception as exc: # pragma: no cover - network failure path + ipfs_identity = {"error": str(exc)} + + try: + bitswap = await bitswap_stat() + except Exception as exc: # pragma: no cover - network failure path + bitswap = {"error": str(exc)} + + try: + repo = await repo_stat() + except Exception as exc: # pragma: no cover - network failure path + repo = {"error": str(exc)} + + # Database counters + encrypted_total = (await session.execute(select(func.count()).select_from(EncryptedContent))).scalar_one() + upload_total = (await session.execute(select(func.count()).select_from(UploadSession))).scalar_one() + derivative_ready = (await session.execute( + select(func.count()).select_from(ContentDerivative).where(ContentDerivative.status == 'ready') + )).scalar_one() + + node_id = b58encode(hot_pubkey).decode() + + overview_payload = { + 'project': { + 'host': PROJECT_HOST, + 'name': os.getenv('PROJECT_NAME', 'unknown'), + 'privacy': os.getenv('NODE_PRIVACY', 'public'), + }, + 'codebase': { + 'branch': branch, + 'commit': commit, + }, + 'node': { + 'id': node_id, + 'service_wallet': service_wallet.address.to_string(1, 1, 1), + 'ton_master': platform.address.to_string(1, 1, 1), + }, + 'runtime': { + 'python': py_platform.python_version(), + 'implementation': py_platform.python_implementation(), + 'platform': py_platform.platform(), + 'utc_now': datetime.utcnow().isoformat() + 'Z', + }, + 'ipfs': { + 'identity': ipfs_identity, + 'bitswap': bitswap, + 'repo': repo, + }, + 'content': { + 'encrypted_total': int(encrypted_total or 0), + 'upload_sessions_total': int(upload_total or 0), + 'derivatives_ready': int(derivative_ready or 0), + }, + 'ton': { + 'host': os.getenv('TONCENTER_HOST'), + 'api_key_configured': bool(os.getenv('TONCENTER_API_KEY')), + 'testnet': bool(int(os.getenv('TESTNET', '0'))), + }, + 'services': _service_states(request), + } + + return response.json(overview_payload) + + +async def s_api_v1_admin_storage(request): + if (unauth := _ensure_admin(request)): + return unauth + + session = request.ctx.db_session + + directories: List[Dict[str, Any]] = [] + directories.append(_dir_stats('Encrypted uploads', UPLOADS_DIR)) + directories.append(_dir_stats('Backend logs', LOG_DIR)) + + extra_dirs = { + 'Host content mount': BACKEND_DATA_DIR_HOST, + 'Host logs mount': BACKEND_LOGS_DIR_HOST, + 'Tus staging': os.getenv('TUSD_DATA_DIR_HOST', ''), + } + for label, path in extra_dirs.items(): + if path: + directories.append(_dir_stats(label, path)) + + disk_snapshot: Optional[Dict[str, Any]] = None + for entry in directories: + if entry['exists']: + try: + usage = shutil.disk_usage(entry['path']) + except Exception: + continue + disk_snapshot = { + 'path': entry['path'], + 'total_bytes': usage.total, + 'used_bytes': usage.total - usage.free, + 'free_bytes': usage.free, + 'percent_used': round((usage.total - usage.free) / usage.total * 100, 2) if usage.total else None, + } + break + + derivatives = (await session.execute(select(ContentDerivative))).scalars().all() + derivative_stats = { + 'ready': sum(1 for d in derivatives if d.status == 'ready'), + 'processing': sum(1 for d in derivatives if d.status == 'processing'), + 'pending': sum(1 for d in derivatives if d.status == 'pending'), + 'failed': sum(1 for d in derivatives if d.status == 'failed'), + 'total_bytes': sum(int(d.size_bytes or 0) for d in derivatives if d.size_bytes), + } + + storage_payload = { + 'directories': directories, + 'disk': disk_snapshot, + 'derivatives': derivative_stats, + } + return response.json(storage_payload) + + +async def s_api_v1_admin_uploads(request): + if (unauth := _ensure_admin(request)): + return unauth + + session = request.ctx.db_session + + counts_rows = (await session.execute( + select(UploadSession.state, func.count()).group_by(UploadSession.state) + )).all() + counts = {state: int(count) for state, count in counts_rows} + total = sum(counts.values()) + + recent_rows = (await session.execute( + select(UploadSession).order_by(UploadSession.updated_at.desc()).limit(25) + )).scalars().all() + recent = [ + { + 'id': row.id, + 'filename': row.filename, + 'size_bytes': row.size_bytes, + 'state': row.state, + 'encrypted_cid': row.encrypted_cid, + 'error': row.error, + 'updated_at': row.updated_at.isoformat() + 'Z', + 'created_at': row.created_at.isoformat() + 'Z', + } + for row in recent_rows + ] + + payload = { + 'total': total, + 'states': counts, + 'recent': recent, + } + return response.json(payload) + + +async def s_api_v1_admin_system(request): + if (unauth := _ensure_admin(request)): + return unauth + + session = request.ctx.db_session + + config_rows = (await session.execute(select(ServiceConfigValue).order_by(ServiceConfigValue.key))).scalars().all() + config_payload = [ + {'key': row.key, 'value': row.value, 'raw': row.packed_value} + for row in config_rows + ] + + env_summary = { + 'PROJECT_NAME': os.getenv('PROJECT_NAME'), + 'PROJECT_HOST': PROJECT_HOST, + 'NODE_PRIVACY': os.getenv('NODE_PRIVACY'), + 'SANIC_PORT': os.getenv('SANIC_PORT'), + 'LOG_LEVEL': os.getenv('LOG_LEVEL'), + 'TESTNET': os.getenv('TESTNET'), + } + + blockchain_counts_rows = (await session.execute( + select(BlockchainTask.status, func.count()).group_by(BlockchainTask.status) + )).all() + blockchain_counts = {status: int(count) for status, count in blockchain_counts_rows} + + latest_index = (await session.execute( + select(ContentIndexItem).order_by(ContentIndexItem.updated_at.desc()).limit(5) + )).scalars().all() + index_entries = [ + { + 'encrypted_cid': item.encrypted_cid, + 'updated_at': item.updated_at.isoformat() + 'Z', + } + for item in latest_index + ] + + payload = { + 'env': env_summary, + 'service_config': config_payload, + 'services': _service_states(request), + 'blockchain_tasks': blockchain_counts, + 'latest_index_items': index_entries, + } + return response.json(payload) + + +async def s_api_v1_admin_blockchain(request): + if (unauth := _ensure_admin(request)): + return unauth + + session = request.ctx.db_session + + counts_rows = (await session.execute( + select(BlockchainTask.status, func.count()).group_by(BlockchainTask.status) + )).all() + counts = {status: int(count) for status, count in counts_rows} + + recent_rows = (await session.execute( + select(BlockchainTask).order_by(BlockchainTask.updated.desc()).limit(20) + )).scalars().all() + recent = [ + { + 'id': task.id, + 'destination': task.destination, + 'amount': task.amount, + 'status': task.status, + 'epoch': task.epoch, + 'seqno': task.seqno, + 'transaction_hash': task.transaction_hash, + 'updated': task.updated.isoformat() + 'Z', + } + for task in recent_rows + ] + + payload = { + 'counts': counts, + 'recent': recent, + } + return response.json(payload) async def s_api_v1_admin_node_setrole(request): - if not _auth_ok(request): - return response.json({"error": "UNAUTHORIZED"}, status=401) + if (unauth := _ensure_admin(request)): + return unauth + data = request.json or {} role = (data.get('role') or '').strip() if role not in ('trusted', 'read-only', 'deny'): @@ -45,8 +430,9 @@ async def s_api_v1_admin_node_setrole(request): async def s_api_v1_admin_nodes(request): - if not _auth_ok(request): - return response.json({"error": "UNAUTHORIZED"}, status=401) + if (unauth := _ensure_admin(request)): + return unauth + session = request.ctx.db_session rows = (await session.execute(select(KnownNode))).scalars().all() items = [] @@ -59,38 +445,37 @@ async def s_api_v1_admin_nodes(request): "role": meta.get('role') or 'read-only', "version": meta.get('version'), "last_seen": (r.last_sync.isoformat() + 'Z') if r.last_sync else None, + "notes": meta.get('notes'), }) return response.json({"items": items}) async def s_api_v1_admin_status(request): - if not _auth_ok(request): - return response.json({"error": "UNAUTHORIZED"}, status=401) + if (unauth := _ensure_admin(request)): + return unauth + session = request.ctx.db_session - # DB metrics - pin_counts = {k: 0 for k in ('not_pinned','queued','pinning','pinned','failed')} + pin_counts: Dict[str, int] = defaultdict(int) rows = (await session.execute(select(IpfsSync))).scalars().all() for r in rows: - pin_counts[r.pin_state] = pin_counts.get(r.pin_state, 0) + 1 + pin_counts[r.pin_state] += 1 deriv = (await session.execute(select(ContentDerivative))).scalars().all() deriv_counts = { - 'ready': sum(1 for d in deriv if d.status=='ready'), - 'processing': sum(1 for d in deriv if d.status=='processing'), - 'pending': sum(1 for d in deriv if d.status=='pending'), - 'failed': sum(1 for d in deriv if d.status=='failed'), + 'ready': sum(1 for d in deriv if d.status == 'ready'), + 'processing': sum(1 for d in deriv if d.status == 'processing'), + 'pending': sum(1 for d in deriv if d.status == 'pending'), + 'failed': sum(1 for d in deriv if d.status == 'failed'), } total_deriv_bytes = sum(int(d.size_bytes or 0) for d in deriv) - # Backlog: number of EC needing conversion ec = (await session.execute(select(EncryptedContent))).scalars().all() backlog = 0 for e in ec: if not e.preview_enabled: continue - kinds = [d.kind for d in deriv if d.content_id==e.id and d.status=='ready'] - req = {'decrypted_low','decrypted_high','decrypted_preview'} + kinds = [d.kind for d in deriv if d.content_id == e.id and d.status == 'ready'] + req = {'decrypted_low', 'decrypted_high', 'decrypted_preview'} if not req.issubset(set(kinds)): backlog += 1 - # IPFS metrics try: bs = await bitswap_stat() except Exception: @@ -99,15 +484,14 @@ async def s_api_v1_admin_status(request): rs = await repo_stat() except Exception: rs = {} - # Limits cfg = ServiceConfig(session) - max_gb = await cfg.get('DERIVATIVE_CACHE_MAX_GB', os.getenv('DERIVATIVE_CACHE_MAX_GB','50')) - ttl_days = await cfg.get('DERIVATIVE_CACHE_TTL_DAYS', os.getenv('DERIVATIVE_CACHE_TTL_DAYS','0')) - max_pins = await cfg.get('SYNC_MAX_CONCURRENT_PINS', os.getenv('SYNC_MAX_CONCURRENT_PINS','4')) - disk_pct = await cfg.get('SYNC_DISK_LOW_WATERMARK_PCT', os.getenv('SYNC_DISK_LOW_WATERMARK_PCT','90')) + max_gb = await cfg.get('DERIVATIVE_CACHE_MAX_GB', os.getenv('DERIVATIVE_CACHE_MAX_GB', '50')) + ttl_days = await cfg.get('DERIVATIVE_CACHE_TTL_DAYS', os.getenv('DERIVATIVE_CACHE_TTL_DAYS', '0')) + max_pins = await cfg.get('SYNC_MAX_CONCURRENT_PINS', os.getenv('SYNC_MAX_CONCURRENT_PINS', '4')) + disk_pct = await cfg.get('SYNC_DISK_LOW_WATERMARK_PCT', os.getenv('SYNC_DISK_LOW_WATERMARK_PCT', '90')) return response.json({ 'ipfs': {'bitswap': bs, 'repo': rs}, - 'pin_counts': pin_counts, + 'pin_counts': dict(pin_counts), 'derivatives': {**deriv_counts, 'total_bytes': total_deriv_bytes}, 'convert_backlog': backlog, 'limits': { @@ -120,8 +504,8 @@ async def s_api_v1_admin_status(request): async def s_api_v1_admin_cache_setlimits(request): - if not _auth_ok(request): - return response.json({"error": "UNAUTHORIZED"}, status=401) + if (unauth := _ensure_admin(request)): + return unauth data = request.json or {} max_gb = float(data.get('max_gb')) ttl_days = int(data.get('ttl_days')) @@ -132,21 +516,19 @@ async def s_api_v1_admin_cache_setlimits(request): async def s_api_v1_admin_cache_cleanup(request): - if not _auth_ok(request): - return response.json({"error": "UNAUTHORIZED"}, status=401) + if (unauth := _ensure_admin(request)): + return unauth data = request.json or {} mode = (data.get('mode') or 'fit') - # reuse janitor logic inline removed = 0 - from datetime import datetime, timedelta + from datetime import timedelta + session = request.ctx.db_session if mode == 'ttl': - # Evict by TTL - async with request.ctx.db_session as session: - from app.core.models._config import ServiceConfig - ttl = int(await ServiceConfig(session).get('DERIVATIVE_CACHE_TTL_DAYS', os.getenv('DERIVATIVE_CACHE_TTL_DAYS','0'))) + cfg = ServiceConfig(session) + ttl = int(await cfg.get('DERIVATIVE_CACHE_TTL_DAYS', os.getenv('DERIVATIVE_CACHE_TTL_DAYS', '0'))) if ttl > 0: now = datetime.utcnow() - rows = (await request.ctx.db_session.execute(select(ContentDerivative).where(ContentDerivative.status=='ready'))).scalars().all() + rows = (await session.execute(select(ContentDerivative).where(ContentDerivative.status == 'ready'))).scalars().all() for r in rows: la = r.last_access_at or r.created_at if la and (now - la) > timedelta(days=ttl): @@ -155,16 +537,18 @@ async def s_api_v1_admin_cache_cleanup(request): os.remove(r.local_path) except Exception: pass - r.status='pending'; r.local_path=None; r.size_bytes=None; r.last_access_at=None + r.status = 'pending' + r.local_path = None + r.size_bytes = None + r.last_access_at = None removed += 1 - await request.ctx.db_session.commit() + await session.commit() else: - # Fit to size target_gb = float(data.get('max_gb') or 0) if target_gb <= 0: return response.json({"error": "BAD_MAX_GB"}, status=400) - limit_bytes = int(target_gb * (1024**3)) - rows = (await request.ctx.db_session.execute(select(ContentDerivative).where(ContentDerivative.status=='ready'))).scalars().all() + limit_bytes = int(target_gb * (1024 ** 3)) + rows = (await session.execute(select(ContentDerivative).where(ContentDerivative.status == 'ready'))).scalars().all() rows.sort(key=lambda r: (r.last_access_at or r.created_at or datetime.utcfromtimestamp(0))) total = sum(int(r.size_bytes or 0) for r in rows) for r in rows: @@ -176,15 +560,18 @@ async def s_api_v1_admin_cache_cleanup(request): except Exception: pass total -= int(r.size_bytes or 0) - r.status='pending'; r.local_path=None; r.size_bytes=None; r.last_access_at=None + r.status = 'pending' + r.local_path = None + r.size_bytes = None + r.last_access_at = None removed += 1 - await request.ctx.db_session.commit() + await session.commit() return response.json({"ok": True, "removed": removed}) async def s_api_v1_admin_sync_setlimits(request): - if not _auth_ok(request): - return response.json({"error": "UNAUTHORIZED"}, status=401) + if (unauth := _ensure_admin(request)): + return unauth data = request.json or {} max_pins = int(data.get('max_concurrent_pins')) disk_pct = int(data.get('disk_low_watermark_pct')) diff --git a/app/core/ipfs_client.py b/app/core/ipfs_client.py index 7610af5..3333fbc 100644 --- a/app/core/ipfs_client.py +++ b/app/core/ipfs_client.py @@ -128,3 +128,10 @@ async def repo_stat() -> Dict[str, Any]: r = await client.post(f"{IPFS_API_URL}/api/v0/repo/stat") r.raise_for_status() return r.json() + + +async def id_info() -> Dict[str, Any]: + async with httpx.AsyncClient(timeout=10) as client: + r = await client.post(f"{IPFS_API_URL}/api/v0/id") + r.raise_for_status() + return r.json()