uploader-bot/app/core/converter/converter_client.py

214 lines
9.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import asyncio
import json
import logging
import os
import shlex
import uuid
from dataclasses import asdict
from typing import Dict, Any, Optional, Tuple, List
from app.core.models.converter.conversion_models import ConversionTask, ConversionResult, ConversionStatus
logger = logging.getLogger(__name__)
class ConverterClient:
"""
Клиент-адаптер для взаимодействия с converter-module без модификации его кода.
Предполагаемая интеграция:
- converter-module/converter/converter.py запускается как отдельный процесс (например, Docker/Podman или локальный python)
- входной файл должен быть доступен по фиксированному пути /app/input
- выход сохраняется в /app/output/output.<ext> и метаданные в /app/output/output.json
- параметры: --ext, --quality, --custom (список), --trim "start-end"
Данный клиент предоставляет унифицированный async API:
submit_conversion() -> str (task_id)
get_conversion_status(task_id) -> ConversionStatus
download_result(task_id) -> ConversionResult (локальные пути к артефактам)
Реализация по умолчанию использует локальный запуск python-процесса конвертера.
Для контейнеров можно переопределить _build_command/_prepare_io.
"""
def __init__(
self,
converter_entry: str = "converter-module/converter/converter.py",
workdir: str = "converter-module",
io_input_path: str = "/app/input",
io_output_dir: str = "/app/output",
python_bin: str = "python3",
concurrent_limit: int = 2,
) -> None:
self.converter_entry = converter_entry
self.workdir = workdir
self.io_input_path = io_input_path
self.io_output_dir = io_output_dir
self.python_bin = python_bin
self._sem = asyncio.Semaphore(concurrent_limit)
# Локальное состояние задач (простая in-memory мапа процессов)
self._tasks_proc: Dict[str, asyncio.subprocess.Process] = {}
self._tasks_info: Dict[str, Dict[str, Any]] = {} # {task_id: {local_input, local_output_dir, logs_path}}
self._tasks_status: Dict[str, ConversionStatus] = {}
self._tasks_error: Dict[str, str] = {}
os.makedirs(self.workdir, exist_ok=True)
async def submit_conversion(self, task: ConversionTask, local_input_path: str) -> str:
"""
Подготовка окружения и запуск конвертации.
local_input_path — путь к исходному файлу на диске ноды uploader-bot.
"""
task_id = task.task_id or str(uuid.uuid4())
logger.info("Submitting conversion task_id=%s", task_id)
# Готовим IO: копируем/линкуем файл в ожидаемое место converter-module
local_output_dir, logs_path = await self._prepare_io(task_id, local_input_path)
# Формируем команду запуска
cmd = self._build_command(task)
logger.debug("Converter command: %s", " ".join(map(shlex.quote, cmd)))
# Старт процесса
proc = await asyncio.create_subprocess_exec(
*cmd,
cwd=self.workdir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
self._tasks_proc[task_id] = proc
self._tasks_status[task_id] = ConversionStatus.RUNNING
self._tasks_info[task_id] = {
"local_input": local_input_path,
"local_output_dir": local_output_dir,
"logs_path": logs_path,
}
# Запускаем корутину логгирования и ожидания завершения
asyncio.create_task(self._stream_and_wait(task_id, proc, logs_path))
return task_id
async def get_conversion_status(self, task_id: str) -> ConversionStatus:
return self._tasks_status.get(task_id, ConversionStatus.QUEUED)
async def download_result(self, task_id: str) -> ConversionResult:
"""
Возвращает результат: путь к сгенерированному файлу и output.json.
Ничего не копирует, возвращает локальные пути внутри converter-module рабочего каталога.
"""
status = self._tasks_status.get(task_id)
if not status:
return ConversionResult(task_id=task_id, status=ConversionStatus.FAILED, error="unknown task")
info = self._tasks_info.get(task_id, {})
output_dir = info.get("local_output_dir")
logs_path = info.get("logs_path")
if status != ConversionStatus.SUCCESS:
return ConversionResult(task_id=task_id, status=status, logs_path=logs_path, error=self._tasks_error.get(task_id))
# Определяем финальный файл: ищем output.* в каталоге вывода
output_file = await self._detect_output_file(output_dir)
if not output_file:
return ConversionResult(task_id=task_id, status=ConversionStatus.FAILED, logs_path=logs_path, error="output file not found")
return ConversionResult(
task_id=task_id,
status=ConversionStatus.SUCCESS,
converter_output_path=output_file,
logs_path=logs_path,
)
# -------------------- helpers --------------------
async def _prepare_io(self, task_id: str, local_input_path: str) -> Tuple[str, str]:
"""
Подготавливает папки converter-module для запуска и логи.
Мы не можем писать в абсолютные /app/* на хосте, но converter ждёт такие пути.
Поэтому используем симлинки внутри workdir: workdir/app/input -> реальный файл.
"""
# Готовим подкаталоги
app_dir = os.path.join(self.workdir, "app")
os.makedirs(app_dir, exist_ok=True)
linked_input = os.path.join(app_dir, "input")
# Чистим старый симлинк/файл
try:
if os.path.islink(linked_input) or os.path.exists(linked_input):
os.remove(linked_input)
except Exception as e:
logger.warning("Failed to cleanup old input link: %s", e)
# Создаем симлинк на входной файл
os.symlink(os.path.abspath(local_input_path), linked_input)
output_dir = os.path.join(app_dir, "output")
os.makedirs(output_dir, exist_ok=True)
# Очистим выходы
for name in os.listdir(output_dir):
try:
os.remove(os.path.join(output_dir, name))
except Exception:
pass
logs_dir = os.path.join(self.workdir, "logs")
os.makedirs(logs_dir, exist_ok=True)
logs_path = os.path.join(logs_dir, f"{task_id}.log")
# Сопоставляем ожидаемые фиксированные пути converter'а с нашими
# Хотя converter использует /app/input и /app/output, cwd=self.workdir и наличие app/input, app/output достаточно.
return output_dir, logs_path
def _build_command(self, task: ConversionTask) -> List[str]:
cmd: List[str] = [
self.python_bin,
self.converter_entry,
"--ext", task.input_ext,
"--quality", task.quality,
]
if task.custom:
cmd += ["--custom", *task.custom]
if task.trim:
cmd += ["--trim", task.trim]
return cmd
async def _stream_and_wait(self, task_id: str, proc: asyncio.subprocess.Process, logs_path: str) -> None:
"""
Стримит логи процесса в файл и обновляет статус по завершению.
"""
try:
with open(logs_path, "a", encoding="utf-8") as lf:
if proc.stdout:
async for line in proc.stdout:
try:
text = line.decode("utf-8", errors="ignore")
except AttributeError:
text = line
lf.write(text)
lf.flush()
logger.info("[converter %s] %s", task_id, text.strip())
rc = await proc.wait()
if rc == 0:
self._tasks_status[task_id] = ConversionStatus.SUCCESS
else:
self._tasks_status[task_id] = ConversionStatus.FAILED
self._tasks_error[task_id] = f"exit_code={rc}"
except Exception as e:
logger.exception("Converter task %s failed: %s", task_id, e)
self._tasks_status[task_id] = ConversionStatus.FAILED
self._tasks_error[task_id] = str(e)
async def _detect_output_file(self, output_dir: str) -> Optional[str]:
"""
Ищет файл output.* в каталоге результата.
"""
try:
for name in os.listdir(output_dir):
if name.startswith("output."):
return os.path.join(output_dir, name)
if name.startswith("output") and "." in name:
return os.path.join(output_dir, name)
except Exception as e:
logger.error("detect_output_file error: %s", e)
return None