uploader-bot/app/api/routes/admin.py

1005 lines
36 KiB
Python

from __future__ import annotations
import os
import platform as py_platform
import shutil
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from base58 import b58encode
from sanic import response
from sqlalchemy import func, select
from app.api.routes._system import get_git_info
from app.core._blockchain.ton.platform import platform
from app.core._config import (
BACKEND_DATA_DIR_HOST,
BACKEND_LOGS_DIR_HOST,
LOG_DIR,
CLIENT_TELEGRAM_BOT_USERNAME,
PROJECT_HOST,
UPLOADS_DIR,
)
from app.core._secrets import hot_pubkey, service_wallet
from app.core.ipfs_client import bitswap_stat, id_info, repo_stat
from app.core.logger import make_log
from app.core.models._config import ServiceConfig, ServiceConfigValue
from app.core.models.content_v3 import (
ContentDerivative,
ContentIndexItem,
EncryptedContent,
IpfsSync,
UploadSession,
)
from app.core.models.my_network import KnownNode
from app.core.models.tasks import BlockchainTask
from app.core.models.node_storage import StoredContent
from app.core.models.user import User
from app.core.models.content.user_content import UserContent
from app.core._utils.share_links import build_content_links
from app.core.content.content_id import ContentId
MIN_ONCHAIN_INDEX = int(os.getenv("MIN_ONCHAIN_INDEX", "8"))
ALLOWED_UPLOAD_FILTERS = {"all", "issues", "processing", "ready", "unindexed"}
ADMIN_COOKIE_NAME = os.getenv('ADMIN_COOKIE_NAME', 'admin_session')
ADMIN_COOKIE_MAX_AGE = int(os.getenv('ADMIN_COOKIE_MAX_AGE', '172800')) # 48h default
ADMIN_COOKIE_SAMESITE = os.getenv('ADMIN_COOKIE_SAMESITE', 'Lax')
ADMIN_COOKIE_SECURE_MODE = os.getenv('ADMIN_COOKIE_SECURE', 'auto').lower()
ADMIN_HEADER_NAME = os.getenv('ADMIN_HEADER_NAME', 'X-Admin-Token')
def _cookie_secure_flag(request) -> bool:
if ADMIN_COOKIE_SECURE_MODE == 'true':
return True
if ADMIN_COOKIE_SECURE_MODE == 'false':
return False
# auto mode: follow request scheme
return getattr(request, 'scheme', 'http') == 'https'
def _set_admin_cookie(resp, request, value: str, max_age: Optional[int] = None):
resp.cookies[ADMIN_COOKIE_NAME] = value
cookie = resp.cookies[ADMIN_COOKIE_NAME]
cookie['path'] = '/'
cookie['httponly'] = True
cookie['samesite'] = ADMIN_COOKIE_SAMESITE
cookie['secure'] = _cookie_secure_flag(request)
if max_age is not None:
cookie['max-age'] = max_age
def _clear_admin_cookie(resp, request):
_set_admin_cookie(resp, request, '', max_age=0)
def _get_admin_header(request) -> Optional[str]:
target = ADMIN_HEADER_NAME.lower()
for key, value in request.headers.items():
if key.lower() == target:
return value
return None
def _auth_ok(request) -> bool:
token = os.getenv('ADMIN_API_TOKEN')
if not token:
return False
cookie_value = request.cookies.get(ADMIN_COOKIE_NAME)
if cookie_value == token:
return True
header_value = _get_admin_header(request)
if not header_value:
return False
if header_value.startswith('Bearer '):
header_value = header_value.split(' ', 1)[1].strip()
return header_value == token
def _unauthorized():
return response.json({"error": "UNAUTHORIZED"}, status=401)
def _ensure_admin(request):
if not _auth_ok(request):
return _unauthorized()
return None
def _dir_stats(label: str, path: str) -> Dict[str, Any]:
target = Path(path)
exists = target.exists()
size = 0
files = 0
if exists:
if target.is_file():
try:
stat = target.stat()
size = stat.st_size
files = 1
except OSError:
pass
else:
for child in target.rglob('*'):
try:
if child.is_file():
files += 1
size += child.stat().st_size
except OSError:
continue
return {
'label': label,
'path': str(target),
'exists': exists,
'file_count': files,
'size_bytes': size,
}
def _service_states(request) -> List[Dict[str, Any]]:
now = datetime.utcnow()
items: List[Dict[str, Any]] = []
memory = getattr(request.app.ctx, 'memory', None)
known_states = getattr(memory, 'known_states', {}) if memory else {}
if isinstance(known_states, dict):
for name, payload in known_states.items():
ts: Optional[datetime] = payload.get('timestamp') if isinstance(payload, dict) else None
delay = (now - ts).total_seconds() if ts else None
healthy = delay is not None and delay < 120
items.append({
'name': name,
'status': payload.get('status') if healthy else 'not working: timeout',
'last_reported_seconds': delay,
})
items.sort(key=lambda item: item['name'])
return items
def _format_dt(value: Optional[datetime]) -> Optional[str]:
return value.isoformat() + 'Z' if isinstance(value, datetime) else None
def _extract_file_hash(local_path: Optional[str]) -> Optional[str]:
if not local_path:
return None
name = Path(local_path).name
return name or None
def _storage_download_url(file_hash: Optional[str]) -> Optional[str]:
if not file_hash:
return None
return f"{PROJECT_HOST}/api/v1.5/storage/{file_hash}"
def _pick_primary_download(candidates: List[tuple[str, Optional[str], Optional[int]]]) -> Optional[str]:
priority = (
'decrypted_high',
'decrypted_low',
'decrypted_preview',
'high',
'low',
'preview',
)
for target in priority:
for kind, url, _ in candidates:
if kind == target and url:
return url
for _, url, _ in candidates:
if url:
return url
return None
async def s_api_v1_admin_login(request):
token = os.getenv('ADMIN_API_TOKEN')
if not token:
make_log('Admin', 'ADMIN_API_TOKEN is not configured', level='error')
return response.json({"error": "ADMIN_TOKEN_NOT_CONFIGURED"}, status=500)
payload = request.json or {}
provided = (payload.get('secret') or '').strip()
if provided != token:
resp = response.json({"error": "UNAUTHORIZED"}, status=401)
_clear_admin_cookie(resp, request)
return resp
resp = response.json({
"ok": True,
"cookie_name": ADMIN_COOKIE_NAME,
"header_name": ADMIN_HEADER_NAME,
"max_age": ADMIN_COOKIE_MAX_AGE,
})
_set_admin_cookie(resp, request, token, ADMIN_COOKIE_MAX_AGE)
return resp
async def s_api_v1_admin_logout(request):
resp = response.json({"ok": True})
_clear_admin_cookie(resp, request)
return resp
async def s_api_v1_admin_overview(request):
if (unauth := _ensure_admin(request)):
return unauth
session = request.ctx.db_session
branch, commit = get_git_info()
try:
ipfs_identity = await id_info()
except Exception as exc: # pragma: no cover - network failure path
ipfs_identity = {"error": str(exc)}
try:
bitswap = await bitswap_stat()
except Exception as exc: # pragma: no cover - network failure path
bitswap = {"error": str(exc)}
try:
repo = await repo_stat()
except Exception as exc: # pragma: no cover - network failure path
repo = {"error": str(exc)}
# Database counters
encrypted_total = (await session.execute(select(func.count()).select_from(EncryptedContent))).scalar_one()
upload_total = (await session.execute(select(func.count()).select_from(UploadSession))).scalar_one()
derivative_ready = (await session.execute(
select(func.count()).select_from(ContentDerivative).where(ContentDerivative.status == 'ready')
)).scalar_one()
node_id = b58encode(hot_pubkey).decode()
overview_payload = {
'project': {
'host': PROJECT_HOST,
'name': os.getenv('PROJECT_NAME', 'unknown'),
'privacy': os.getenv('NODE_PRIVACY', 'public'),
},
'codebase': {
'branch': branch,
'commit': commit,
},
'node': {
'id': node_id,
'service_wallet': service_wallet.address.to_string(1, 1, 1),
'ton_master': platform.address.to_string(1, 1, 1),
},
'runtime': {
'python': py_platform.python_version(),
'implementation': py_platform.python_implementation(),
'platform': py_platform.platform(),
'utc_now': datetime.utcnow().isoformat() + 'Z',
},
'ipfs': {
'identity': ipfs_identity,
'bitswap': bitswap,
'repo': repo,
},
'content': {
'encrypted_total': int(encrypted_total or 0),
'upload_sessions_total': int(upload_total or 0),
'derivatives_ready': int(derivative_ready or 0),
},
'ton': {
'host': os.getenv('TONCENTER_HOST'),
'api_key_configured': bool(os.getenv('TONCENTER_API_KEY')),
'testnet': bool(int(os.getenv('TESTNET', '0'))),
},
'services': _service_states(request),
}
return response.json(overview_payload)
async def s_api_v1_admin_storage(request):
if (unauth := _ensure_admin(request)):
return unauth
session = request.ctx.db_session
directories: List[Dict[str, Any]] = []
directories.append(_dir_stats('Encrypted uploads', UPLOADS_DIR))
directories.append(_dir_stats('Backend logs', LOG_DIR))
extra_dirs = {
'Host content mount': BACKEND_DATA_DIR_HOST,
'Host logs mount': BACKEND_LOGS_DIR_HOST,
'Tus staging': os.getenv('TUSD_DATA_DIR_HOST', ''),
}
for label, path in extra_dirs.items():
if path:
directories.append(_dir_stats(label, path))
disk_snapshot: Optional[Dict[str, Any]] = None
for entry in directories:
if entry['exists']:
try:
usage = shutil.disk_usage(entry['path'])
except Exception:
continue
disk_snapshot = {
'path': entry['path'],
'total_bytes': usage.total,
'used_bytes': usage.total - usage.free,
'free_bytes': usage.free,
'percent_used': round((usage.total - usage.free) / usage.total * 100, 2) if usage.total else None,
}
break
derivatives = (await session.execute(select(ContentDerivative))).scalars().all()
derivative_stats = {
'ready': sum(1 for d in derivatives if d.status == 'ready'),
'processing': sum(1 for d in derivatives if d.status == 'processing'),
'pending': sum(1 for d in derivatives if d.status == 'pending'),
'failed': sum(1 for d in derivatives if d.status == 'failed'),
'total_bytes': sum(int(d.size_bytes or 0) for d in derivatives if d.size_bytes),
}
storage_payload = {
'directories': directories,
'disk': disk_snapshot,
'derivatives': derivative_stats,
}
return response.json(storage_payload)
async def s_api_v1_admin_uploads(request):
if (unauth := _ensure_admin(request)):
return unauth
session = request.ctx.db_session
raw_filter = (request.args.get('filter') or '').lower()
raw_filters: List[str] = []
if raw_filter:
raw_filters = [item.strip() for item in raw_filter.split(',') if item.strip()]
effective_filters = [item for item in raw_filters if item in ALLOWED_UPLOAD_FILTERS and item != 'all']
search_query = (request.args.get('search') or '').strip()
search_lower = search_query.lower()
try:
limit = int(request.args.get('limit') or 50)
except Exception:
limit = 50
limit = max(1, min(limit, 200))
try:
scan_limit = int(request.args.get('scan') or 0)
except Exception:
scan_limit = 0
if scan_limit <= 0:
scan_default = max(limit, 100 if (effective_filters or search_lower) else limit)
scan_limit = min(max(scan_default, limit), 500)
else:
scan_limit = max(limit, min(scan_limit, 500))
counts_rows = (await session.execute(
select(UploadSession.state, func.count()).group_by(UploadSession.state)
)).all()
counts = {state: int(count) for state, count in counts_rows}
total = sum(counts.values())
recent_rows = (await session.execute(
select(UploadSession).order_by(UploadSession.updated_at.desc()).limit(25)
)).scalars().all()
recent = [
{
'id': row.id,
'filename': row.filename,
'size_bytes': row.size_bytes,
'state': row.state,
'encrypted_cid': row.encrypted_cid,
'error': row.error,
'updated_at': row.updated_at.isoformat() + 'Z',
'created_at': row.created_at.isoformat() + 'Z',
}
for row in recent_rows
]
content_rows = (await session.execute(
select(EncryptedContent).order_by(EncryptedContent.created_at.desc()).limit(scan_limit)
)).scalars().all()
content_ids = [row.id for row in content_rows]
encrypted_cids = [row.encrypted_cid for row in content_rows]
derivatives_map: Dict[int, list[ContentDerivative]] = {cid: [] for cid in content_ids}
if content_ids:
derivative_rows = (await session.execute(
select(ContentDerivative).where(ContentDerivative.content_id.in_(content_ids))
)).scalars().all()
for derivative in derivative_rows:
derivatives_map.setdefault(derivative.content_id, []).append(derivative)
ipfs_map: Dict[int, Optional[IpfsSync]] = {}
if content_ids:
ipfs_rows = (await session.execute(
select(IpfsSync).where(IpfsSync.content_id.in_(content_ids))
)).scalars().all()
for sync in ipfs_rows:
ipfs_map[sync.content_id] = sync
uploads_map: Dict[str, List[UploadSession]] = {cid: [] for cid in encrypted_cids}
if encrypted_cids:
uploads_for_content = (await session.execute(
select(UploadSession).where(UploadSession.encrypted_cid.in_(encrypted_cids))
)).scalars().all()
for upload in uploads_for_content:
uploads_map.setdefault(upload.encrypted_cid, []).append(upload)
for chain in uploads_map.values():
chain.sort(key=lambda u: (u.updated_at or u.created_at or datetime.min))
stored_map: Dict[str, StoredContent] = {}
stored_by_id: Dict[int, StoredContent] = {}
if encrypted_cids:
stored_rows = (await session.execute(
select(StoredContent).where(StoredContent.content_id.in_(encrypted_cids))
)).scalars().all()
for stored in stored_rows:
stored_map[stored.content_id] = stored
stored_by_id[stored.id] = stored
user_map: Dict[int, User] = {}
user_ids = {stored.user_id for stored in stored_map.values() if stored.user_id}
if user_ids:
user_rows = (await session.execute(select(User).where(User.id.in_(user_ids)))).scalars().all()
for user in user_rows:
user_map[user.id] = user
license_counts: Dict[int, int] = {}
stored_ids = list(stored_by_id.keys())
if stored_ids:
license_rows = (await session.execute(
select(UserContent.content_id, func.count())
.where(UserContent.content_id.in_(stored_ids))
.group_by(UserContent.content_id)
)).all()
for content_id, count in license_rows:
license_counts[int(content_id)] = int(count)
contents_payload: List[Dict[str, Any]] = []
category_totals: Dict[str, int] = {key: 0 for key in ALLOWED_UPLOAD_FILTERS if key != 'all'}
matched_total = 0
for content in content_rows:
derivatives = derivatives_map.get(content.id, [])
attempts: Dict[str, int] = defaultdict(int)
derivative_entries: List[Dict[str, Any]] = []
summary: Dict[str, int] = defaultdict(int)
download_candidates: List[tuple[str, Optional[str], Optional[int]]] = []
for derivative in sorted(derivatives, key=lambda item: item.created_at or datetime.min):
summary[derivative.status] += 1
attempts[derivative.kind] += 1
file_hash = _extract_file_hash(derivative.local_path)
download_url = _storage_download_url(file_hash)
derivative_entries.append({
'kind': derivative.kind,
'status': derivative.status,
'size_bytes': derivative.size_bytes,
'error': derivative.error,
'created_at': _format_dt(derivative.created_at),
'updated_at': _format_dt(derivative.last_access_at or derivative.created_at),
'attempts': attempts[derivative.kind],
'download_url': download_url,
})
download_candidates.append((derivative.kind, download_url, derivative.size_bytes))
conversion_state = None
if summary.get('ready'):
conversion_state = 'ready'
elif summary.get('processing'):
conversion_state = 'processing'
elif summary.get('pending'):
conversion_state = 'pending'
elif summary.get('failed'):
conversion_state = 'failed'
upload_chain = uploads_map.get(content.encrypted_cid, [])
latest_upload = upload_chain[-1] if upload_chain else None
upload_history = [
{
'state': entry.state,
'at': _format_dt(entry.updated_at or entry.created_at),
'error': entry.error,
'filename': entry.filename,
}
for entry in upload_chain
]
ipfs_sync = ipfs_map.get(content.id)
stored = stored_map.get(content.encrypted_cid)
metadata_cid = None
content_hash = None
stored_payload: Optional[Dict[str, Any]] = None
blockchain_payload: Optional[Dict[str, Any]] = None
if stored:
metadata_cid = (stored.meta or {}).get('metadata_cid')
content_hash = stored.hash
download_candidates.append(('stored', stored.web_url, None))
stored_payload = {
'stored_id': stored.id,
'type': stored.type,
'owner_address': stored.owner_address,
'user_id': stored.user_id,
'status': stored.status,
'content_url': stored.web_url,
'download_url': stored.web_url,
'created': _format_dt(stored.created),
'updated': _format_dt(stored.updated),
}
if stored.user_id and stored.user_id in user_map:
user = user_map[stored.user_id]
stored_payload['user'] = {
'id': user.id,
'telegram_id': user.telegram_id,
'username': user.username,
'first_name': (user.meta or {}).get('first_name') if user.meta else None,
'last_name': (user.meta or {}).get('last_name') if user.meta else None,
}
blockchain_payload = {
'onchain_index': stored.onchain_index,
'item_address': (stored.meta or {}).get('item_address'),
'indexed': stored.onchain_index is not None and stored.onchain_index >= MIN_ONCHAIN_INDEX,
'license_count': license_counts.get(stored.id, 0),
}
else:
try:
cid_obj = ContentId.deserialize(content.encrypted_cid)
content_hash = cid_obj.content_hash_b58
except Exception:
content_hash = None
share_target = None
if stored:
try:
share_target = stored.cid.serialize_v2()
except Exception:
share_target = content.encrypted_cid
else:
share_target = content.encrypted_cid
_, startapp_url, web_view_url = build_content_links(
share_target,
None,
project_host=PROJECT_HOST,
bot_username=CLIENT_TELEGRAM_BOT_USERNAME,
)
primary_download = _pick_primary_download(download_candidates)
derivative_downloads = [
{
'kind': kind,
'url': url,
'size_bytes': size_bytes,
}
for kind, url, size_bytes in download_candidates
if url
]
upload_state_norm = (latest_upload.state or '').lower() if latest_upload else ''
conversion_state_norm = (conversion_state or '').lower() if conversion_state else ''
ipfs_state_norm = (ipfs_sync.pin_state or '').lower() if (ipfs_sync and ipfs_sync.pin_state) else ''
derivative_states_norm = [(derivative.status or '').lower() for derivative in derivatives]
status_values = [upload_state_norm, conversion_state_norm, ipfs_state_norm] + derivative_states_norm
has_issue = any(
value and ("fail" in value or "error" in value or "timeout" in value)
for value in status_values
)
if not has_issue and any(event.get('error') for event in upload_history):
has_issue = True
if not has_issue and any(derivative.error for derivative in derivatives):
has_issue = True
if not has_issue and ipfs_sync and ipfs_sync.pin_error:
has_issue = True
is_onchain_indexed = bool(blockchain_payload and blockchain_payload.get('indexed'))
is_unindexed = not is_onchain_indexed
conversion_done = (
summary.get('ready', 0) > 0
or conversion_state_norm in ('ready', 'converted')
or any(state in ('ready', 'converted', 'complete') for state in derivative_states_norm)
)
ipfs_done = ipfs_state_norm in ('pinned', 'ready')
is_ready = not has_issue and conversion_done and (ipfs_done or not ipfs_sync) and is_onchain_indexed
processing_tokens = ('process', 'pending', 'queue', 'upload', 'pin', 'sync')
has_processing_keywords = any(
value and any(token in value for token in processing_tokens)
for value in status_values
)
categories = set()
if has_issue:
categories.add('issues')
if is_ready:
categories.add('ready')
if is_unindexed:
categories.add('unindexed')
is_processing = not is_ready and not has_issue and has_processing_keywords
if is_processing:
categories.add('processing')
if not is_ready and not has_issue and 'processing' not in categories:
categories.add('processing')
flags = {
'issues': 'issues' in categories,
'processing': 'processing' in categories,
'ready': 'ready' in categories,
'unindexed': 'unindexed' in categories,
}
search_parts: List[Any] = [
content.title,
content.description,
content.encrypted_cid,
metadata_cid,
content_hash,
]
if blockchain_payload:
search_parts.append(blockchain_payload.get('item_address') or '')
if stored_payload:
search_parts.append(stored_payload.get('owner_address') or '')
user_info = stored_payload.get('user') or {}
search_parts.extend(
[
str(user_info.get('id') or ''),
str(user_info.get('telegram_id') or ''),
user_info.get('username') or '',
user_info.get('first_name') or '',
user_info.get('last_name') or '',
]
)
search_blob = ' '.join(str(part) for part in search_parts if part).lower()
matches_filter = (not effective_filters) or any(cat in categories for cat in effective_filters)
matches_search = (not search_lower) or (search_lower in search_blob)
if not matches_filter or not matches_search:
continue
matched_total += 1
for cat in categories:
if cat in category_totals:
category_totals[cat] += 1
if len(contents_payload) >= limit:
continue
contents_payload.append({
'encrypted_cid': content.encrypted_cid,
'metadata_cid': metadata_cid,
'content_hash': content_hash,
'title': content.title,
'description': content.description,
'content_type': content.content_type,
'size': {
'encrypted': content.enc_size_bytes,
'plain': content.plain_size_bytes,
},
'created_at': _format_dt(content.created_at),
'updated_at': _format_dt(content.updated_at),
'status': {
'upload_state': latest_upload.state if latest_upload else None,
'conversion_state': conversion_state,
'ipfs_state': ipfs_sync.pin_state if ipfs_sync else None,
'onchain': blockchain_payload,
},
'upload_history': upload_history,
'derivative_summary': dict(summary),
'derivatives': derivative_entries,
'ipfs': (
{
'pin_state': ipfs_sync.pin_state,
'pin_error': ipfs_sync.pin_error,
'bytes_total': ipfs_sync.bytes_total,
'bytes_fetched': ipfs_sync.bytes_fetched,
'pinned_at': _format_dt(ipfs_sync.pinned_at),
'updated_at': _format_dt(ipfs_sync.updated_at),
}
if ipfs_sync else None
),
'stored': stored_payload,
'links': {
'web_view': web_view_url,
'start_app': startapp_url,
'api_view': f"{PROJECT_HOST}/api/v1/content.view/{share_target}",
'download_primary': primary_download,
'download_derivatives': derivative_downloads,
},
'flags': flags,
})
payload = {
'total': total,
'states': counts,
'recent': recent,
'contents': contents_payload,
'matching_total': matched_total,
'filter': effective_filters or ['all'],
'search': search_query or None,
'limit': limit,
'scan': scan_limit,
'scanned': len(content_rows),
'category_totals': category_totals,
}
return response.json(payload)
async def s_api_v1_admin_system(request):
if (unauth := _ensure_admin(request)):
return unauth
session = request.ctx.db_session
config_rows = (await session.execute(select(ServiceConfigValue).order_by(ServiceConfigValue.key))).scalars().all()
config_payload = []
for row in config_rows:
key_lower = (row.key or '').lower()
masked = ('private' in key_lower and 'key' in key_lower) or ('seed' in key_lower)
config_payload.append({
'key': row.key,
'value': '*** hidden ***' if masked else row.value,
'raw': None if masked else row.packed_value,
})
env_summary = {
'PROJECT_NAME': os.getenv('PROJECT_NAME'),
'PROJECT_HOST': PROJECT_HOST,
'NODE_PRIVACY': os.getenv('NODE_PRIVACY'),
'SANIC_PORT': os.getenv('SANIC_PORT'),
'LOG_LEVEL': os.getenv('LOG_LEVEL'),
'TESTNET': os.getenv('TESTNET'),
}
blockchain_counts_rows = (await session.execute(
select(BlockchainTask.status, func.count()).group_by(BlockchainTask.status)
)).all()
blockchain_counts = {status: int(count) for status, count in blockchain_counts_rows}
latest_index = (await session.execute(
select(ContentIndexItem).order_by(ContentIndexItem.updated_at.desc()).limit(5)
)).scalars().all()
index_entries = [
{
'encrypted_cid': item.encrypted_cid,
'updated_at': item.updated_at.isoformat() + 'Z',
}
for item in latest_index
]
payload = {
'env': env_summary,
'service_config': config_payload,
'services': _service_states(request),
'blockchain_tasks': blockchain_counts,
'latest_index_items': index_entries,
}
return response.json(payload)
async def s_api_v1_admin_blockchain(request):
if (unauth := _ensure_admin(request)):
return unauth
session = request.ctx.db_session
counts_rows = (await session.execute(
select(BlockchainTask.status, func.count()).group_by(BlockchainTask.status)
)).all()
counts = {status: int(count) for status, count in counts_rows}
recent_rows = (await session.execute(
select(BlockchainTask).order_by(BlockchainTask.updated.desc()).limit(20)
)).scalars().all()
recent = [
{
'id': task.id,
'destination': task.destination,
'amount': task.amount,
'status': task.status,
'epoch': task.epoch,
'seqno': task.seqno,
'transaction_hash': task.transaction_hash,
'updated': task.updated.isoformat() + 'Z',
}
for task in recent_rows
]
payload = {
'counts': counts,
'recent': recent,
}
return response.json(payload)
async def s_api_v1_admin_node_setrole(request):
if (unauth := _ensure_admin(request)):
return unauth
data = request.json or {}
role = (data.get('role') or '').strip()
if role not in ('trusted', 'read-only', 'deny'):
return response.json({"error": "BAD_ROLE"}, status=400)
pub = (data.get('public_key') or '').strip()
host = (data.get('host') or '').strip()
if not pub and not host:
return response.json({"error": "MISSING_TARGET"}, status=400)
session = request.ctx.db_session
row = None
if pub:
row = (await session.execute(select(KnownNode).where(KnownNode.public_key == pub))).scalars().first()
if not row and host:
row = (await session.execute(select(KnownNode).where(KnownNode.ip == host))).scalars().first()
if not row:
return response.json({"error": "NOT_FOUND"}, status=404)
meta = row.meta or {}
meta['role'] = role
row.meta = meta
await session.commit()
return response.json({"ok": True, "node": {"ip": row.ip, "public_key": row.public_key, "role": role}})
async def s_api_v1_admin_nodes(request):
if (unauth := _ensure_admin(request)):
return unauth
session = request.ctx.db_session
rows = (await session.execute(select(KnownNode))).scalars().all()
items = []
for r in rows:
meta = r.meta or {}
items.append({
"ip": r.ip,
"port": r.port,
"public_key": r.public_key,
"role": meta.get('role') or 'read-only',
"version": meta.get('version'),
"last_seen": (r.last_sync.isoformat() + 'Z') if r.last_sync else None,
"notes": meta.get('notes'),
})
return response.json({"items": items})
async def s_api_v1_admin_status(request):
if (unauth := _ensure_admin(request)):
return unauth
session = request.ctx.db_session
pin_counts: Dict[str, int] = defaultdict(int)
rows = (await session.execute(select(IpfsSync))).scalars().all()
for r in rows:
pin_counts[r.pin_state] += 1
deriv = (await session.execute(select(ContentDerivative))).scalars().all()
deriv_counts = {
'ready': sum(1 for d in deriv if d.status == 'ready'),
'processing': sum(1 for d in deriv if d.status == 'processing'),
'pending': sum(1 for d in deriv if d.status == 'pending'),
'failed': sum(1 for d in deriv if d.status == 'failed'),
}
total_deriv_bytes = sum(int(d.size_bytes or 0) for d in deriv)
ec = (await session.execute(select(EncryptedContent))).scalars().all()
backlog = 0
for e in ec:
if not e.preview_enabled:
continue
kinds = [d.kind for d in deriv if d.content_id == e.id and d.status == 'ready']
req = {'decrypted_low', 'decrypted_high', 'decrypted_preview'}
if not req.issubset(set(kinds)):
backlog += 1
try:
bs = await bitswap_stat()
except Exception:
bs = {}
try:
rs = await repo_stat()
except Exception:
rs = {}
cfg = ServiceConfig(session)
max_gb = await cfg.get('DERIVATIVE_CACHE_MAX_GB', os.getenv('DERIVATIVE_CACHE_MAX_GB', '50'))
ttl_days = await cfg.get('DERIVATIVE_CACHE_TTL_DAYS', os.getenv('DERIVATIVE_CACHE_TTL_DAYS', '0'))
max_pins = await cfg.get('SYNC_MAX_CONCURRENT_PINS', os.getenv('SYNC_MAX_CONCURRENT_PINS', '4'))
disk_pct = await cfg.get('SYNC_DISK_LOW_WATERMARK_PCT', os.getenv('SYNC_DISK_LOW_WATERMARK_PCT', '90'))
return response.json({
'ipfs': {'bitswap': bs, 'repo': rs},
'pin_counts': dict(pin_counts),
'derivatives': {**deriv_counts, 'total_bytes': total_deriv_bytes},
'convert_backlog': backlog,
'limits': {
'DERIVATIVE_CACHE_MAX_GB': float(max_gb),
'DERIVATIVE_CACHE_TTL_DAYS': int(ttl_days),
'SYNC_MAX_CONCURRENT_PINS': int(max_pins),
'SYNC_DISK_LOW_WATERMARK_PCT': int(disk_pct),
}
})
async def s_api_v1_admin_cache_setlimits(request):
if (unauth := _ensure_admin(request)):
return unauth
data = request.json or {}
max_gb = float(data.get('max_gb'))
ttl_days = int(data.get('ttl_days'))
cfg = ServiceConfig(request.ctx.db_session)
await cfg.set('DERIVATIVE_CACHE_MAX_GB', max_gb)
await cfg.set('DERIVATIVE_CACHE_TTL_DAYS', ttl_days)
return response.json({"ok": True})
async def s_api_v1_admin_cache_cleanup(request):
if (unauth := _ensure_admin(request)):
return unauth
data = request.json or {}
mode = (data.get('mode') or 'fit')
removed = 0
from datetime import timedelta
session = request.ctx.db_session
if mode == 'ttl':
cfg = ServiceConfig(session)
ttl = int(await cfg.get('DERIVATIVE_CACHE_TTL_DAYS', os.getenv('DERIVATIVE_CACHE_TTL_DAYS', '0')))
if ttl > 0:
now = datetime.utcnow()
rows = (await session.execute(select(ContentDerivative).where(ContentDerivative.status == 'ready'))).scalars().all()
for r in rows:
la = r.last_access_at or r.created_at
if la and (now - la) > timedelta(days=ttl):
try:
if r.local_path and os.path.exists(r.local_path):
os.remove(r.local_path)
except Exception:
pass
r.status = 'pending'
r.local_path = None
r.size_bytes = None
r.last_access_at = None
removed += 1
await session.commit()
else:
target_gb = float(data.get('max_gb') or 0)
if target_gb <= 0:
return response.json({"error": "BAD_MAX_GB"}, status=400)
limit_bytes = int(target_gb * (1024 ** 3))
rows = (await session.execute(select(ContentDerivative).where(ContentDerivative.status == 'ready'))).scalars().all()
rows.sort(key=lambda r: (r.last_access_at or r.created_at or datetime.utcfromtimestamp(0)))
total = sum(int(r.size_bytes or 0) for r in rows)
for r in rows:
if total <= limit_bytes:
break
try:
if r.local_path and os.path.exists(r.local_path):
os.remove(r.local_path)
except Exception:
pass
total -= int(r.size_bytes or 0)
r.status = 'pending'
r.local_path = None
r.size_bytes = None
r.last_access_at = None
removed += 1
await session.commit()
return response.json({"ok": True, "removed": removed})
async def s_api_v1_admin_sync_setlimits(request):
if (unauth := _ensure_admin(request)):
return unauth
data = request.json or {}
max_pins = int(data.get('max_concurrent_pins'))
disk_pct = int(data.get('disk_low_watermark_pct'))
cfg = ServiceConfig(request.ctx.db_session)
await cfg.set('SYNC_MAX_CONCURRENT_PINS', max_pins)
await cfg.set('SYNC_DISK_LOW_WATERMARK_PCT', disk_pct)
return response.json({"ok": True})