uploader-bot/app/api/routes/content_access_routes.py

176 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import logging
from typing import Any, Dict, Optional
from fastapi import APIRouter, HTTPException, Query, Request
from fastapi.responses import StreamingResponse, JSONResponse
from app.core.access.content_access_manager import ContentAccessManager
from app.core._blockchain.ton.nft_license_manager import NFTLicenseManager
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/content", tags=["content-access"])
def _json_ok(data: Dict[str, Any]) -> JSONResponse:
return JSONResponse({"success": True, "data": data})
@router.post("/request-access")
async def request_access(body: Dict[str, Any]):
"""
POST /api/content/request-access
Тело:
{
"content_id": "sha256...",
"ton_proof": {
"address": "...", "public_key": "...", "timestamp": 0,
"domain_val": "...", "domain_len": 0, "payload": "...", "signature": "..."
},
"nft_address": "EQ...." (optional),
"token_ttl_sec": 600 (optional)
}
Ответ:
{"success": true, "data": {"token": "...", "expires_at": 0, "owner_address": "...", "nft_item": {...}}}
"""
try:
content_id = body.get("content_id")
ton_proof = body.get("ton_proof") or {}
nft_address = body.get("nft_address")
token_ttl_sec = body.get("token_ttl_sec")
if not content_id:
raise HTTPException(status_code=400, detail="content_id is required")
if not ton_proof:
raise HTTPException(status_code=400, detail="ton_proof is required")
mgr = ContentAccessManager(nft_manager=NFTLicenseManager())
ok, err, payload = await mgr.grant_access(
ton_proof=ton_proof,
content_id=content_id,
nft_address=nft_address,
token_ttl_sec=token_ttl_sec,
)
if not ok:
raise HTTPException(status_code=403, detail=err or "Access denied")
return _json_ok(payload)
except HTTPException:
raise
except Exception as e:
logger.exception("request_access failed")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/verify-license")
async def verify_license(body: Dict[str, Any]):
"""
POST /api/content/verify-license
Тело:
{
"content_id": "sha256...",
"ton_proof": { ... as above ... },
"nft_address": "EQ...." (optional)
}
Ответ:
{"success": true, "data": {"valid": true, "owner_address": "...", "nft_item": {...}}}
"""
try:
content_id = body.get("content_id")
ton_proof = body.get("ton_proof") or {}
nft_address = body.get("nft_address")
if not content_id:
raise HTTPException(status_code=400, detail="content_id is required")
if not ton_proof:
raise HTTPException(status_code=400, detail="ton_proof is required")
nft_mgr = NFTLicenseManager()
ok, err, nft_item = await nft_mgr.check_license_validity(
ton_proof=ton_proof, content_id=content_id, nft_address=nft_address
)
if not ok:
return _json_ok({"valid": False, "error": err})
# Извлечем адрес владельца для удобства клиента
owner_address = None
try:
# небольшой импорт без цикла, чтобы не тянуть все сверху
from app.core.access.content_access_manager import nft_proof_owner # noqa
owner_address = nft_proof_owner(ton_proof)
except Exception:
owner_address = None
return _json_ok({"valid": True, "owner_address": owner_address, "nft_item": nft_item})
except HTTPException:
raise
except Exception as e:
logger.exception("verify_license failed")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/stream/{content_id}")
async def stream_content(
request: Request,
content_id: str,
token: str = Query(..., description="Временный токен, полученный через /request-access"),
):
"""
GET /api/content/stream/{content_id}?token=...
Возвращает поток расшифрованного контента при валидном временном токене.
Примечание:
- Здесь требуется провайдер ключа контента (content_key_provider), который по content_id вернет 32-байтовый ключ.
В текущем сервисе ключ не выдается из NFT, он хранится в ноде/сети вне блокчейна и не возвращается клиенту.
- В данном роуте показан каркас: откуда читать зашифрованные данные (encrypted_obj) зависит от вашей БД/фс.
"""
try:
mgr = ContentAccessManager()
# Заглушка чтения зашифрованного объекта контента.
# Здесь нужно интегрировать фактическое хранилище, например БД/файловую систему, и извлечь объект,
# совместимый с ContentCipher.decrypt_content входом.
# Формат encrypted_obj:
# {
# "ciphertext_b64": "...",
# "nonce_b64": "...",
# "tag_b64": "...",
# "metadata": {...},
# "content_id": "sha256..."
# }
encrypted_obj: Optional[Dict[str, Any]] = None
if not encrypted_obj:
raise HTTPException(status_code=404, detail="Encrypted content not found")
# Провайдер ключа шифрования по content_id — внедрите вашу реализацию
def content_key_provider(cid: str) -> bytes:
# Должен вернуть 32-байтовый ключ (из secure-хранилища узла)
# raise NotImplementedError / или извлечение из KMS/базы
raise HTTPException(status_code=501, detail="content_key_provider is not configured")
ok, err, pt = mgr.decrypt_for_stream(
encrypted_obj=encrypted_obj,
content_key_provider=content_key_provider,
token=token,
content_id=content_id,
associated_data=None,
)
if not ok or pt is None:
raise HTTPException(status_code=403, detail=err or "Access denied")
async def stream_bytes():
# Простейшая потоковая отдача всего буфера.
# Для больших данных отдавайте чанками.
yield pt
# Тип контента может определяться по metadata или по хранимому mime-type
return StreamingResponse(stream_bytes(), media_type="application/octet-stream")
except HTTPException:
raise
except Exception as e:
logger.exception("stream_content failed")
raise HTTPException(status_code=500, detail=str(e))