import asyncio import os import json import shutil from datetime import datetime from typing import List, Tuple, Optional from sqlalchemy import select from app.core.logger import make_log from app.core.storage import db_session from app.core._config import UPLOADS_DIR, BACKEND_LOGS_DIR_HOST from app.core.models.content_v3 import ( EncryptedContent, ContentKey, ContentDerivative, UploadSession, ) from app.core.models.node_storage import StoredContent from app.core.ipfs_client import cat_stream from app.core.crypto.encf_stream import decrypt_encf_auto from app.core.crypto.keywrap import unwrap_dek, wrap_dek, KeyWrapError from app.core.network.key_client import request_key_from_peer from app.core.models.my_network import KnownNode CONCURRENCY = int(os.getenv("CONVERT_V3_MAX_CONCURRENCY", "3")) def _ensure_dir(path: str): try: os.makedirs(path, exist_ok=True) except Exception: pass async def _sha256_b58(file_path: str) -> str: import hashlib import base58 h = hashlib.sha256() with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(2 * 1024 * 1024), b''): h.update(chunk) return base58.b58encode(h.digest()).decode() async def _save_derivative(file_path: str, filename: str) -> Tuple[str, int]: """Move file into UPLOADS_DIR under sha256 b58 name; return (hash_b58, size).""" file_hash = await _sha256_b58(file_path) dst = os.path.join(UPLOADS_DIR, file_hash) try: os.remove(dst) except FileNotFoundError: pass shutil.move(file_path, dst) size = os.path.getsize(dst) return file_hash, size async def _run_media_converter(input_host_path: str, input_ext: str, quality: str, trim_value: Optional[str], is_audio: bool) -> Tuple[str, dict]: rid = __import__('uuid').uuid4().hex[:8] output_dir_container = f"/tmp/conv_{rid}" output_dir_host = f"/tmp/conv_{rid}" _ensure_dir(output_dir_host) logs_dir_host = BACKEND_LOGS_DIR_HOST _ensure_dir(logs_dir_host) cmd = [ "docker", "run", "--rm", "-v", f"{input_host_path}:/app/input:ro", "-v", f"{output_dir_host}:/app/output", "-v", f"{logs_dir_host}:/app/logs", "media_converter", "--ext", input_ext, "--quality", quality, ] if trim_value: cmd.extend(["--trim", trim_value]) if is_audio: cmd.append("--audio-only") proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"media_converter failed: {stderr.decode()}") # Find produced media file and optional output.json try: files = os.listdir(output_dir_host) except Exception as e: raise RuntimeError(f"Read output dir error: {e}") media_files = [f for f in files if f != "output.json"] if len(media_files) != 1: raise RuntimeError(f"Expected one media file, found {len(media_files)}: {media_files}") output_media = os.path.join(output_dir_host, media_files[0]) ffprobe_meta = {} out_json = os.path.join(output_dir_host, "output.json") if os.path.exists(out_json): try: with open(out_json, 'r') as f: ffprobe_meta = json.load(f) except Exception: ffprobe_meta = {} return output_media, ffprobe_meta async def _convert_content(ec: EncryptedContent, input_host_path: str): content_kind = 'audio' if ec.content_type.startswith('audio/') else ('video' if ec.content_type.startswith('video/') else 'other') if content_kind == 'other': return input_ext = (ec.content_type.split('/')[-1] or 'bin') is_audio = content_kind == 'audio' # Required outputs required = ['high', 'low', 'low_preview'] # Preview interval conf = ec.preview_conf or {} intervals = conf.get('intervals') or [[0, int(conf.get('duration_ms', 30000))]] main_interval = intervals[0] trim_value = None start_s = max(0, int(main_interval[0]) // 1000) dur_s = max(1, int((main_interval[1] - main_interval[0]) // 1000) or 30) trim_value = f"{start_s},{dur_s}" qualities = { 'high': 'high', 'low': 'low', 'low_preview': 'low', } for opt in required: try: # Mark derivative processing async with db_session() as session: cd = ContentDerivative( content_id=ec.id, kind=f"decrypted_{opt if opt != 'low_preview' else 'preview'}", interval_start_ms=main_interval[0] if opt == 'low_preview' else None, interval_end_ms=main_interval[1] if opt == 'low_preview' else None, local_path="", status='processing', ) session.add(cd) await session.commit() out_path, ffprobe = await _run_media_converter( input_host_path=input_host_path, input_ext=input_ext, quality=qualities[opt], trim_value=trim_value if opt == 'low_preview' else None, is_audio=is_audio, ) # Save into store and StoredContent file_hash, size_bytes = await _save_derivative(out_path, os.path.basename(out_path)) async with db_session() as session: sc = StoredContent( type="local/content_bin", hash=file_hash, user_id=None, filename=os.path.basename(out_path), meta={'encrypted_cid': ec.encrypted_cid, 'kind': opt, 'ffprobe_meta': ffprobe}, created=datetime.utcnow(), ) session.add(sc) await session.flush() # Update derivative record cd = (await session.execute(select(ContentDerivative).where( ContentDerivative.content_id == ec.id, ContentDerivative.kind == (f"decrypted_{opt if opt != 'low_preview' else 'preview'}"), ContentDerivative.status == 'processing' ))).scalars().first() if cd: cd.local_path = os.path.join(UPLOADS_DIR, file_hash) cd.size_bytes = size_bytes cd.content_type = ('audio/mpeg' if is_audio else 'video/mp4') if opt != 'high' else ec.content_type cd.status = 'ready' await session.commit() make_log('convert_v3', f"Converted {ec.encrypted_cid} opt={opt} -> {file_hash}") except Exception as e: make_log('convert_v3', f"Convert error {ec.encrypted_cid} opt={opt}: {e}", level='error') async with db_session() as session: cd = ContentDerivative( content_id=ec.id, kind=f"decrypted_{opt if opt != 'low_preview' else 'preview'}", status='failed', error=str(e), local_path="", ) session.add(cd) await session.commit() async def _pick_pending(limit: int) -> List[Tuple[EncryptedContent, str]]: async with db_session() as session: # Find A/V contents with preview_enabled and no ready low/low_preview derivatives yet ecs = (await session.execute(select(EncryptedContent).where( EncryptedContent.preview_enabled == True ).order_by(EncryptedContent.created_at.desc()))).scalars().all() picked: List[Tuple[EncryptedContent, str]] = [] for ec in ecs: # Check if derivatives already ready rows = (await session.execute(select(ContentDerivative).where(ContentDerivative.content_id == ec.id))).scalars().all() kinds_ready = {r.kind for r in rows if r.status == 'ready'} required = {'decrypted_low', 'decrypted_high'} if ec.content_type.startswith('audio/') else {'decrypted_low', 'decrypted_high', 'decrypted_preview'} if required.issubset(kinds_ready): continue # Always decrypt from IPFS using local or remote key storage_path: Optional[str] = None ck = (await session.execute(select(ContentKey).where(ContentKey.content_id == ec.id))).scalars().first() if ck: storage_path = await stage_plain_from_ipfs(ec, ck.key_ciphertext_b64) if not storage_path: peers = (await session.execute(select(KnownNode))).scalars().all() for peer in peers: base_url = f"http://{peer.ip}:{peer.port}" dek = await request_key_from_peer(base_url, ec.encrypted_cid) if not dek: continue try: dek_b64 = wrap_dek(dek) except KeyWrapError as exc: make_log('convert_v3', f"wrap failed for peer DEK: {exc}", level='error') continue session_ck = ContentKey( content_id=ec.id, key_ciphertext_b64=dek_b64, key_fingerprint=peer.public_key, issuer_node_id=peer.public_key, allow_auto_grant=True, ) session.add(session_ck) await session.commit() storage_path = await stage_plain_from_ipfs(ec, dek_b64) if storage_path: break if not storage_path or not os.path.exists(storage_path): continue picked.append((ec, storage_path)) if len(picked) >= limit: break return picked async def worker_loop(): sem = asyncio.Semaphore(CONCURRENCY) async def _run_one(ec: EncryptedContent, input_path: str): async with sem: try: await _convert_content(ec, input_path) # After successful conversion, attempt to remove staging file to avoid duplicates try: if input_path and input_path.startswith("/data/") and os.path.exists(input_path): os.remove(input_path) except Exception: pass except Exception as e: make_log('convert_v3', f"job error {ec.encrypted_cid}: {e}", level='error') while True: try: batch = await _pick_pending(limit=CONCURRENCY * 2) if not batch: await asyncio.sleep(3) continue tasks = [asyncio.create_task(_run_one(ec, path)) for (ec, path) in batch] await asyncio.gather(*tasks) except Exception as e: make_log('convert_v3', f"loop error: {e}", level='error') await asyncio.sleep(2) async def main_fn(memory): make_log('convert_v3', f"Service started with concurrency={CONCURRENCY}", level='info') await worker_loop() async def stage_plain_from_ipfs(ec: EncryptedContent, dek_wrapped: str) -> Optional[str]: """Download encrypted ENCF stream from IPFS and decrypt on the fly into a temp file.""" import tempfile try: dek = unwrap_dek(dek_wrapped) except KeyWrapError as exc: make_log('convert_v3', f"unwrap failed for {ec.encrypted_cid}: {exc}", level='error') return None tmp = tempfile.NamedTemporaryFile(prefix=f"dec_{ec.encrypted_cid[:8]}_", delete=False) tmp_path = tmp.name tmp.close() try: async def _aiter(): async for ch in cat_stream(ec.encrypted_cid): yield ch await decrypt_encf_auto(_aiter(), dek, tmp_path) return tmp_path except Exception as e: make_log('convert_v3', f"decrypt from ipfs failed: {e}", level='error') try: os.remove(tmp_path) except Exception: pass return None