uploader-bot/app/core/converter/conversion_manager.py

271 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import asyncio
import base64
import logging
import os
import time
import uuid
from dataclasses import asdict
from typing import Dict, Any, Optional, List, Tuple
from app.core.converter.converter_client import ConverterClient
from app.core.crypto.content_cipher import ContentCipher
from app.core.content.chunk_manager import ChunkManager
from app.core.models.converter.conversion_models import (
ConversionTask,
ConversionResult,
ConversionStatus,
ConversionPriority,
ContentMetadata,
)
from app.core.stats.metrics_collector import MetricsCollector
logger = logging.getLogger(__name__)
class _PriorityQueue:
"""
Простая приоритетная очередь на базе asyncio.PriorityQueue.
Чем больше приоритет, тем раньше задача (инвертируем знак).
"""
def __init__(self) -> None:
self._q: asyncio.PriorityQueue[Tuple[int, str, ConversionTask]] = asyncio.PriorityQueue()
self._counter = 0 # стабилизация порядка
async def put(self, task: ConversionTask) -> None:
self._counter += 1
# Инвертируем, чтобы HIGH(90) шел раньше LOW(10)
await self._q.put((-int(task.priority), self._counter, task))
async def get(self) -> ConversionTask:
p, _, t = await self._q.get()
return t
def empty(self) -> bool:
return self._q.empty()
class ConversionManager:
"""
Управляет жизненным циклом конвертации:
- постановка в очередь (приоритет)
- запуск через ConverterClient
- post-processing: шифрование ContentCipher, чанкинг ChunkManager
- retry при ошибках
- метрики через MetricsCollector
"""
def __init__(
self,
converter_client: Optional[ConverterClient] = None,
metrics: Optional[MetricsCollector] = None,
concurrent_limit: int = 2,
) -> None:
self._client = converter_client or ConverterClient()
self._cipher = ContentCipher()
self._chunker = ChunkManager(self._cipher)
self._metrics = metrics or MetricsCollector()
self._queue = _PriorityQueue()
self._inflight: Dict[str, ConversionTask] = {}
self._results: Dict[str, ConversionResult] = {}
self._lock = asyncio.Lock()
self._sem = asyncio.Semaphore(concurrent_limit)
# -------------------- Public API --------------------
async def process_upload(
self,
local_input_path: str,
input_ext: str,
quality: str,
metadata: ContentMetadata,
priority: ConversionPriority = ConversionPriority.NORMAL,
custom: Optional[List[str]] = None,
trim: Optional[str] = None,
max_retries: int = 3,
) -> str:
"""
Точка входа из API: ставит задачу в очередь и возвращает task_id.
"""
task_id = str(uuid.uuid4())
task = ConversionTask(
task_id=task_id,
input_path=local_input_path,
input_ext=input_ext,
quality="high" if quality == "high" else "low",
trim=trim,
custom=custom or [],
priority=priority,
max_retries=max_retries,
metadata=metadata,
)
await self.queue_conversion(task)
return task_id
async def queue_conversion(self, task: ConversionTask) -> None:
logger.info("Queue conversion task_id=%s priority=%s", task.task_id, task.priority)
await self._queue.put(task)
await self._metrics.inc_requests()
async def get_conversion_status(self, task_id: str) -> ConversionStatus:
async with self._lock:
res = self._results.get(task_id)
if res:
return res.status
if task_id in self._inflight:
return ConversionStatus.RUNNING
# иначе он в очереди
return ConversionStatus.QUEUED
async def handle_conversion_result(self, task_id: str) -> Optional[ConversionResult]:
"""
Возвращает итоговый ConversionResult если уже готов.
"""
async with self._lock:
return self._results.get(task_id)
# -------------------- Worker logic --------------------
async def _run_single(self, task: ConversionTask) -> None:
"""
Полный цикл одной задачи: запуск конвертера, шифрование, чанкинг, сохранение результата.
"""
start_ts = time.time()
async with self._sem:
async with self._lock:
self._inflight[task.task_id] = task
try:
# 1) Запуск конвертера
await self._metrics.observe_latency_ms(1) # лёгкий трейс
await self._client.submit_conversion(task, task.input_path)
# 2) Ожидание завершения: опрашиваем статус, затем забираем результат
status = await self._poll_until_done(task.task_id)
conv_res = await self._client.download_result(task.task_id)
if status != ConversionStatus.SUCCESS or conv_res.status != ConversionStatus.SUCCESS:
raise RuntimeError(conv_res.error or "conversion failed")
# 3) Прочитать выходной файл и выполнить шифрование + чанкинг
output_path = conv_res.converter_output_path
if not output_path or not os.path.exists(output_path):
raise FileNotFoundError("converted output not found")
with open(output_path, "rb") as f:
converted_bytes = f.read()
# Шифрование полной сущности перед чанкингом
content_key = self._cipher.generate_content_key()
encrypted_obj = self._cipher.encrypt_content(
plaintext=converted_bytes,
key=content_key,
metadata={
"title": task.metadata.title,
"author": task.metadata.author,
"description": task.metadata.description,
"attributes": task.metadata.attributes,
"quality": task.quality,
"source_ext": task.input_ext,
},
)
content_id = encrypted_obj["content_id"]
# Для дедупликации и совместимости чанкуем уже шифротекст по архитектуре:
# Используем nonce/tag каждого чанка отдельно (ChunkManager делает encrypt_content для каждого чанка).
# Но нам нужен plaintext для разбиения на куски до шифрования? В архитектуре зашифрованные чанки требуются.
# Следуем текущей реализации ChunkManager: он сам шифрует куски.
chunks = self._chunker.split_content(
content_id=content_id,
plaintext=converted_bytes,
content_key=content_key,
metadata={
"nft_title": task.metadata.title,
"nft_author": task.metadata.author,
"quality": task.quality,
},
)
# Сериализуем чанки для отдачи через API
chunks_serialized = [asdict(c) for c in chunks]
nft_metadata = {
"name": task.metadata.title,
"description": task.metadata.description,
"author": task.metadata.author,
"attributes": task.metadata.attributes,
"tags": task.metadata.tags,
"collection": task.metadata.collection,
"external_url": None,
}
result = ConversionResult(
task_id=task.task_id,
status=ConversionStatus.SUCCESS,
converter_output_path=output_path,
logs_path=None,
content_id=content_id,
chunks=chunks_serialized,
nft_metadata=nft_metadata,
finished_at=int(time.time()),
)
async with self._lock:
self._results[task.task_id] = result
self._inflight.pop(task.task_id, None)
await self._metrics.inc_conversions()
await self._metrics.observe_latency_ms((time.time() - start_ts) * 1000.0)
logger.info("Conversion completed: task_id=%s content_id=%s chunks=%d",
task.task_id, content_id, len(chunks))
except Exception as e:
logger.exception("Conversion task %s failed: %s", task.task_id, e)
task.attempts += 1
if task.attempts <= task.max_retries:
# Retry: возвращаем задачу в очередь с тем же приоритетом (экспоненциальная пауза)
backoff = min(2 ** (task.attempts - 1), 30)
await asyncio.sleep(backoff)
await self._queue.put(task)
await self._metrics.inc_errors()
else:
fail_res = ConversionResult(
task_id=task.task_id,
status=ConversionStatus.FAILED,
error=str(e),
finished_at=int(time.time()),
)
async with self._lock:
self._results[task.task_id] = fail_res
self._inflight.pop(task.task_id, None)
await self._metrics.inc_errors()
async def _poll_until_done(self, task_id: str, interval_sec: float = 1.0, timeout_sec: float = 3600.0) -> ConversionStatus:
"""
Простой polling статуса процесса конвертера.
"""
start = time.time()
while True:
status = await self._client.get_conversion_status(task_id)
if status in (ConversionStatus.SUCCESS, ConversionStatus.FAILED, ConversionStatus.CANCELED):
return status
if time.time() - start > timeout_sec:
return ConversionStatus.FAILED
await asyncio.sleep(interval_sec)
# -------------------- Scheduler loop --------------------
async def run_scheduler(self, shutdown_event: Optional[asyncio.Event] = None) -> None:
"""
Основной цикл: достаёт из очереди и обрабатывает задачи.
"""
while True:
if shutdown_event and shutdown_event.is_set():
break
try:
task = await self._queue.get()
asyncio.create_task(self._run_single(task))
except Exception as e:
logger.error("Scheduler loop error: %s", e)
await asyncio.sleep(1.0)