uploader-bot/app/api/routes/upload_tus.py

278 lines
10 KiB
Python

from __future__ import annotations
import base64
import json
import os
from datetime import datetime
from typing import Dict, Any
import aiofiles
from base58 import b58encode
from sanic import response
from app.core._config import UPLOADS_DIR
from app.core._secrets import hot_pubkey
from app.core.crypto.aes_gcm_stream import encrypt_file_to_encf, CHUNK_BYTES
from app.core.crypto.keywrap import wrap_dek, KeyWrapError
from app.core.ipfs_client import add_streamed_file
from app.core.logger import make_log
from app.core.models.content_v3 import EncryptedContent, ContentKey, IpfsSync, ContentIndexItem, UploadSession
from app.core.models.node_storage import StoredContent
from app.core.storage import db_session
from app.core._utils.resolve_content import resolve_content
from sqlalchemy import select
def _b64(s: bytes) -> str:
return base64.b64encode(s).decode()
async def s_api_v1_upload_tus_hook(request):
"""
tusd HTTP hook endpoint. We mainly handle post-finish to: encrypt -> IPFS add+pin -> record DB.
"""
try:
payload: Dict[str, Any] = request.json
except Exception:
payload = None
if payload is None:
raw_body = request.body or b''
try:
payload = json.loads(raw_body) if raw_body else {}
except Exception:
payload = {}
event = (payload.get("Type") or payload.get("type") or
payload.get("Event") or payload.get("event") or
payload.get("Hook") or payload.get("hook") or
payload.get("HookName") or payload.get("hook_name") or
request.headers.get("Hook-Name") or request.headers.get("hook-name"))
upload = payload.get("Upload") or payload.get("upload") or {}
if not event:
hook_name = (payload.get("HookName") or payload.get("hook") or
payload.get("hook_name") or request.headers.get("Hook-Name"))
raw = request.body or b''
preview = raw[:512]
make_log("tus-hook", f"Missing event type in hook payload; ignoring (hook={hook_name}, keys={list(payload.keys())}, raw={preview!r})", level="warning")
return response.json({"ok": True, "skipped": True})
if event not in ("post-finish", "postfinish"):
# accept but ignore other events
return response.json({"ok": True})
# Extract storage path from tusd payload
storage = upload.get("Storage") or {}
file_path = storage.get("Path") or storage.get("path")
if not file_path:
return response.json({"ok": False, "error": "NO_STORAGE_PATH"}, status=400)
meta = upload.get("MetaData") or {}
# Common metadata keys
title = meta.get("title") or meta.get("Title") or meta.get("name") or "Untitled"
artist = (meta.get("artist") or meta.get("Artist") or "").strip()
description = meta.get("description") or meta.get("Description") or ""
content_type = meta.get("content_type") or meta.get("Content-Type") or "application/octet-stream"
preview_enabled = content_type.startswith("audio/") or content_type.startswith("video/")
# Optional preview window overrides from tus metadata
try:
start_ms = int(meta.get("preview_start_ms") or 0)
dur_ms = int(meta.get("preview_duration_ms") or 30000)
except Exception:
start_ms, dur_ms = 0, 30000
# Record/Update upload session
upload_id = upload.get("ID") or upload.get("Id") or upload.get("id")
try:
size = int(upload.get("Size") or 0)
except Exception:
size = None
async with db_session() as session:
us = (await session.get(UploadSession, upload_id)) if upload_id else None
if not us and upload_id:
us = UploadSession(
id=upload_id,
filename=os.path.basename(file_path),
size_bytes=size,
state='processing',
encrypted_cid=None,
)
session.add(us)
await session.commit()
# Read & encrypt by streaming (ENCF v1 / AES-GCM)
# Generate per-content random DEK and salt
dek = os.urandom(32)
salt = os.urandom(16)
key_fpr = b58encode(hot_pubkey).decode() # fingerprint as our node id for now
# Stream encrypt into IPFS add
try:
wrapped_dek = wrap_dek(dek)
except KeyWrapError as e:
make_log("tus-hook", f"Key wrap failed: {e}", level="error")
async with db_session() as session:
if upload_id:
us = await session.get(UploadSession, upload_id)
if us:
us.state = 'failed'
us.error = str(e)
await session.commit()
return response.json({"ok": False, "error": "KEY_WRAP_FAILED"}, status=500)
try:
with open(file_path, 'rb') as f:
result = await add_streamed_file(
encrypt_file_to_encf(f, dek, CHUNK_BYTES, salt),
filename=os.path.basename(file_path),
params={},
)
except Exception as e:
make_log("tus-hook", f"Encrypt+add failed: {e}", level="error")
# mark failed
async with db_session() as session:
if upload_id:
us = await session.get(UploadSession, upload_id)
if us:
us.state = 'failed'
us.error = str(e)
await session.commit()
return response.json({"ok": False, "error": "ENCRYPT_ADD_FAILED"}, status=500)
encrypted_cid = result.get("Hash")
try:
enc_size = int(result.get("Size") or 0)
except Exception:
enc_size = None
encrypted_cid_obj, cid_err = resolve_content(encrypted_cid)
if cid_err:
make_log("tus-hook", f"Encrypted CID resolve failed: {cid_err}", level="error")
return response.json({"ok": False, "error": "INVALID_ENCRYPTED_CID"}, status=500)
encrypted_hash_b58 = encrypted_cid_obj.content_hash_b58
# Persist records
async with db_session() as session:
ec = EncryptedContent(
encrypted_cid=encrypted_cid,
title=title,
artist=artist or None,
description=description,
content_type=content_type,
enc_size_bytes=enc_size,
plain_size_bytes=os.path.getsize(file_path),
preview_enabled=preview_enabled,
preview_conf=({"duration_ms": dur_ms, "intervals": [[start_ms, start_ms + dur_ms]]} if preview_enabled else {}),
aead_scheme="AES_GCM",
chunk_bytes=CHUNK_BYTES,
salt_b64=_b64(salt),
)
session.add(ec)
await session.flush()
ck = ContentKey(
content_id=ec.id,
key_ciphertext_b64=wrapped_dek,
key_fingerprint=key_fpr,
issuer_node_id=key_fpr,
allow_auto_grant=True,
)
session.add(ck)
await session.flush()
sync = IpfsSync(
content_id=ec.id,
pin_state='pinned',
bytes_total=enc_size,
bytes_fetched=enc_size,
pinned_at=datetime.utcnow(),
)
session.add(sync)
existing_encrypted_content = (await session.execute(
select(StoredContent).where(StoredContent.hash == encrypted_hash_b58)
)).scalars().first()
if not existing_encrypted_content:
placeholder_meta = {
'content_type': content_type,
'storage': 'ipfs',
'encrypted_cid': encrypted_cid,
'upload_id': upload_id,
'source': 'tusd',
'title': title,
'artist': artist or None,
}
encrypted_stored_content = StoredContent(
type="local/encrypted_ipfs",
hash=encrypted_hash_b58,
content_id=encrypted_cid,
filename=os.path.basename(file_path),
meta=placeholder_meta,
user_id=request.ctx.user.id if request.ctx.user else None,
owner_address=None,
encrypted=True,
decrypted_content_id=None,
key_id=None,
created=datetime.utcnow(),
)
session.add(encrypted_stored_content)
# Publish signed index item
item = {
"encrypted_cid": encrypted_cid,
"title": title,
"description": description,
"artist": artist,
"content_type": content_type,
"size_bytes": enc_size,
"preview_enabled": preview_enabled,
"preview_conf": ec.preview_conf,
"issuer_node_id": key_fpr,
"salt_b64": _b64(salt),
"artist": artist or None,
}
try:
from app.core._crypto.signer import Signer
from app.core._secrets import hot_seed
signer = Signer(hot_seed)
blob = json.dumps(item, sort_keys=True, separators=(",", ":")).encode()
sig = signer.sign(blob)
except Exception:
sig = ""
session.add(ContentIndexItem(encrypted_cid=encrypted_cid, payload=item, sig=sig))
await session.commit()
# Update upload session with result and purge staging to avoid duplicates
async with db_session() as session:
if upload_id:
us = await session.get(UploadSession, upload_id)
if us:
us.state = 'pinned'
us.encrypted_cid = encrypted_cid
us.error = None
if size:
us.size_bytes = size
# prefer using IPFS for downstream conversion; remove staging
try:
if file_path and os.path.exists(file_path):
os.remove(file_path)
except Exception:
pass
us.storage_path = None
await session.commit()
make_log("tus-hook", f"Uploaded+encrypted {file_path} -> {encrypted_cid}")
placeholder_path = os.path.join(UPLOADS_DIR, encrypted_hash_b58)
if not os.path.exists(placeholder_path):
try:
async with aiofiles.open(placeholder_path, "wb") as ph:
await ph.write(json.dumps({
"ipfs_cid": encrypted_cid,
"note": "Encrypted payload stored in IPFS"
}).encode())
except Exception as e:
make_log("tus-hook", f"Failed to create placeholder for {encrypted_hash_b58}: {e}", level="warning")
return response.json({"ok": True, "encrypted_cid": encrypted_cid, "upload_id": upload_id})