from sanic import response from sqlalchemy import select from app.core.models.content_v3 import UploadSession, EncryptedContent, ContentDerivative from app.core._utils.resolve_content import resolve_content async def s_api_v1_upload_status(request, upload_id: str): session = request.ctx.db_session row = await session.get(UploadSession, upload_id) if not row: return response.json({"error": "NOT_FOUND"}, status=404) encrypted_hash = None conversion = {"state": "not_started", "details": []} if row.encrypted_cid: cid_obj, err = resolve_content(row.encrypted_cid) if not err: encrypted_hash = cid_obj.content_hash_b58 ec = (await session.execute(select(EncryptedContent).where(EncryptedContent.encrypted_cid == row.encrypted_cid))).scalars().first() if ec: derivative_rows = (await session.execute( select(ContentDerivative.kind, ContentDerivative.status).where(ContentDerivative.content_id == ec.id) )).all() details = [ {"kind": kind, "status": status} for kind, status in derivative_rows ] if ec.content_type and ec.content_type.startswith("audio/"): required = {"decrypted_high", "decrypted_low"} elif ec.content_type and ec.content_type.startswith("video/"): required = {"decrypted_high", "decrypted_low", "decrypted_preview"} else: required = {"decrypted_original"} statuses = {kind: status for kind, status in derivative_rows} if required and all(statuses.get(k) == "ready" for k in required): conv_state = "ready" elif any(statuses.get(k) == "failed" for k in required): conv_state = "failed" elif any(statuses.get(k) in ("processing", "pending") for k in required): conv_state = "processing" elif required: conv_state = "pending" else: conv_state = "not_started" conversion = {"state": conv_state, "details": details} return response.json({ "id": row.id, "state": row.state, "encrypted_cid": row.encrypted_cid, "encrypted_hash": encrypted_hash, "size_bytes": row.size_bytes, "error": row.error, "conversion": conversion, })