from __future__ import annotations import base64 import json import os from datetime import datetime from typing import Dict, Any import aiofiles from base58 import b58encode from sanic import response from app.core._config import UPLOADS_DIR, PROJECT_HOST from app.core._secrets import hot_pubkey from app.core.crypto.aes_gcm_stream import encrypt_file_to_encf, CHUNK_BYTES from app.core.crypto.keywrap import wrap_dek, KeyWrapError from app.core.ipfs_client import add_streamed_file from app.core.logger import make_log from app.core.models.content_v3 import EncryptedContent, ContentKey, IpfsSync, ContentIndexItem, UploadSession from app.core.models.node_storage import StoredContent from app.core.storage import db_session from app.core._utils.resolve_content import resolve_content from app.core.events.service import record_event from sqlalchemy import select def _b64(s: bytes) -> str: return base64.b64encode(s).decode() async def s_api_v1_upload_tus_hook(request): """ tusd HTTP hook endpoint. We mainly handle post-finish to: encrypt -> IPFS add+pin -> record DB. """ try: payload: Dict[str, Any] = request.json except Exception: payload = None if payload is None: raw_body = request.body or b'' try: payload = json.loads(raw_body) if raw_body else {} except Exception: payload = {} event = (payload.get("Type") or payload.get("type") or payload.get("Event") or payload.get("event") or payload.get("Hook") or payload.get("hook") or payload.get("HookName") or payload.get("hook_name") or request.headers.get("Hook-Name") or request.headers.get("hook-name")) upload = payload.get("Upload") or payload.get("upload") or {} if not event: hook_name = (payload.get("HookName") or payload.get("hook") or payload.get("hook_name") or request.headers.get("Hook-Name")) raw = request.body or b'' preview = raw[:512] make_log("tus-hook", f"Missing event type in hook payload; ignoring (hook={hook_name}, keys={list(payload.keys())}, raw={preview!r})", level="warning") return response.json({"ok": True, "skipped": True}) if event not in ("post-finish", "postfinish"): # accept but ignore other events return response.json({"ok": True}) # Extract storage path from tusd payload storage = upload.get("Storage") or {} file_path = storage.get("Path") or storage.get("path") if not file_path: return response.json({"ok": False, "error": "NO_STORAGE_PATH"}, status=400) meta = upload.get("MetaData") or {} # Common metadata keys title = meta.get("title") or meta.get("Title") or meta.get("name") or "Untitled" artist = (meta.get("artist") or meta.get("Artist") or "").strip() description = meta.get("description") or meta.get("Description") or "" content_type = meta.get("content_type") or meta.get("Content-Type") or "application/octet-stream" preview_enabled = content_type.startswith("audio/") or content_type.startswith("video/") # Optional preview window overrides from tus metadata try: start_ms = int(meta.get("preview_start_ms") or 0) dur_ms = int(meta.get("preview_duration_ms") or 30000) except Exception: start_ms, dur_ms = 0, 30000 # Record/Update upload session upload_id = upload.get("ID") or upload.get("Id") or upload.get("id") try: size = int(upload.get("Size") or 0) except Exception: size = None async with db_session() as session: us = (await session.get(UploadSession, upload_id)) if upload_id else None if not us and upload_id: us = UploadSession( id=upload_id, filename=os.path.basename(file_path), size_bytes=size, state='processing', encrypted_cid=None, ) session.add(us) await session.commit() # Read & encrypt by streaming (ENCF v1 / AES-GCM) # Generate per-content random DEK and salt dek = os.urandom(32) salt = os.urandom(16) key_fpr = b58encode(hot_pubkey).decode() # fingerprint as our node id for now # Stream encrypt into IPFS add try: wrapped_dek = wrap_dek(dek) except KeyWrapError as e: make_log("tus-hook", f"Key wrap failed: {e}", level="error") async with db_session() as session: if upload_id: us = await session.get(UploadSession, upload_id) if us: us.state = 'failed' us.error = str(e) await session.commit() return response.json({"ok": False, "error": "KEY_WRAP_FAILED"}, status=500) try: with open(file_path, 'rb') as f: result = await add_streamed_file( encrypt_file_to_encf(f, dek, CHUNK_BYTES, salt), filename=os.path.basename(file_path), params={}, ) except Exception as e: make_log("tus-hook", f"Encrypt+add failed: {e}", level="error") # mark failed async with db_session() as session: if upload_id: us = await session.get(UploadSession, upload_id) if us: us.state = 'failed' us.error = str(e) await session.commit() return response.json({"ok": False, "error": "ENCRYPT_ADD_FAILED"}, status=500) encrypted_cid = result.get("Hash") try: enc_size = int(result.get("Size") or 0) except Exception: enc_size = None encrypted_cid_obj, cid_err = resolve_content(encrypted_cid) if cid_err: make_log("tus-hook", f"Encrypted CID resolve failed: {cid_err}", level="error") return response.json({"ok": False, "error": "INVALID_ENCRYPTED_CID"}, status=500) encrypted_hash_b58 = encrypted_cid_obj.content_hash_b58 # Persist records async with db_session() as session: ec = EncryptedContent( encrypted_cid=encrypted_cid, title=title, artist=artist or None, description=description, content_type=content_type, enc_size_bytes=enc_size, plain_size_bytes=os.path.getsize(file_path), preview_enabled=preview_enabled, preview_conf=({"duration_ms": dur_ms, "intervals": [[start_ms, start_ms + dur_ms]]} if preview_enabled else {}), aead_scheme="AES_GCM", chunk_bytes=CHUNK_BYTES, salt_b64=_b64(salt), ) session.add(ec) await session.flush() ck = ContentKey( content_id=ec.id, key_ciphertext_b64=wrapped_dek, key_fingerprint=key_fpr, issuer_node_id=key_fpr, allow_auto_grant=True, ) session.add(ck) await session.flush() sync = IpfsSync( content_id=ec.id, pin_state='pinned', bytes_total=enc_size, bytes_fetched=enc_size, pinned_at=datetime.utcnow(), ) session.add(sync) existing_encrypted_content = (await session.execute( select(StoredContent).where(StoredContent.hash == encrypted_hash_b58) )).scalars().first() if not existing_encrypted_content: placeholder_meta = { 'content_type': content_type, 'storage': 'ipfs', 'encrypted_cid': encrypted_cid, 'upload_id': upload_id, 'source': 'tusd', 'title': title, 'artist': artist or None, } encrypted_stored_content = StoredContent( type="local/encrypted_ipfs", hash=encrypted_hash_b58, content_id=encrypted_cid, filename=os.path.basename(file_path), meta=placeholder_meta, user_id=request.ctx.user.id if request.ctx.user else None, owner_address=None, encrypted=True, decrypted_content_id=None, key_id=None, created=datetime.utcnow(), ) session.add(encrypted_stored_content) # Publish signed index item item = { "encrypted_cid": encrypted_cid, "title": title, "description": description, "artist": artist, "content_type": content_type, "size_bytes": enc_size, "preview_enabled": preview_enabled, "preview_conf": ec.preview_conf, "issuer_node_id": key_fpr, "salt_b64": _b64(salt), "artist": artist or None, } try: from app.core._crypto.signer import Signer from app.core._secrets import hot_seed signer = Signer(hot_seed) blob = json.dumps(item, sort_keys=True, separators=(",", ":")).encode() sig = signer.sign(blob) except Exception: sig = "" session.add(ContentIndexItem(encrypted_cid=encrypted_cid, payload=item, sig=sig)) try: await record_event( session, 'content_uploaded', { 'encrypted_cid': encrypted_cid, 'content_hash': encrypted_hash_b58, 'title': title, 'description': description, 'content_type': content_type, 'size_bytes': enc_size, 'user_id': request.ctx.user.id if getattr(request.ctx, 'user', None) else None, 'telegram_id': getattr(getattr(request.ctx, 'user', None), 'telegram_id', None), }, origin_host=PROJECT_HOST, ) except Exception as exc: make_log("Events", f"Failed to record content_uploaded event: {exc}", level="warning") await session.commit() # Update upload session with result and purge staging to avoid duplicates async with db_session() as session: if upload_id: us = await session.get(UploadSession, upload_id) if us: us.state = 'pinned' us.encrypted_cid = encrypted_cid us.error = None if size: us.size_bytes = size # prefer using IPFS for downstream conversion; remove staging try: if file_path and os.path.exists(file_path): os.remove(file_path) except Exception: pass us.storage_path = None await session.commit() make_log("tus-hook", f"Uploaded+encrypted {file_path} -> {encrypted_cid}") placeholder_path = os.path.join(UPLOADS_DIR, encrypted_hash_b58) if not os.path.exists(placeholder_path): try: async with aiofiles.open(placeholder_path, "wb") as ph: await ph.write(json.dumps({ "ipfs_cid": encrypted_cid, "note": "Encrypted payload stored in IPFS" }).encode()) except Exception as e: make_log("tus-hook", f"Failed to create placeholder for {encrypted_hash_b58}: {e}", level="warning") return response.json({"ok": True, "encrypted_cid": encrypted_cid, "upload_id": upload_id})