uploader-bot/app/core/background/conversion_daemon.py

94 lines
4.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import asyncio
import logging
from typing import Optional, Dict
from app.core.converter.conversion_manager import ConversionManager
from app.core.models.converter.conversion_models import ConversionStatus, ConversionResult
logger = logging.getLogger(__name__)
class ConversionDaemon:
"""
Фоновый обработчик очереди конвертации.
Запускает планировщик, мониторит активные задачи и выполняет очистку завершённых.
"""
def __init__(self, manager: Optional[ConversionManager] = None) -> None:
self._manager = manager or ConversionManager()
self._shutdown = asyncio.Event()
self._monitor_interval = 2.0
self._cleanup_interval = 60.0
# локальное состояние для мониторинга
self._last_status: Dict[str, str] = {}
async def process_queue(self) -> None:
"""
Главный цикл планировщика: извлекает задачи из очереди и запускает обработку.
"""
logger.info("ConversionDaemon: starting scheduler loop")
try:
await self._manager.run_scheduler(self._shutdown)
except asyncio.CancelledError:
logger.info("ConversionDaemon: scheduler cancelled")
except Exception as e:
logger.exception("ConversionDaemon: scheduler error: %s", e)
async def monitor_conversions(self) -> None:
"""
Мониторинг статусов задач для логов и метрик.
"""
logger.info("ConversionDaemon: starting monitor loop")
try:
while not self._shutdown.is_set():
# Здесь можно подключить внешний реестр задач, если потребуется
# В текущей реализации ConversionManager хранит результаты локально.
# Логика мониторинга будет простой: статусы будут проверяться по известным task_id,
# которые могли бы сохраняться в каком-либо реестре. Для демо делаем заглушку.
await asyncio.sleep(self._monitor_interval)
except asyncio.CancelledError:
logger.info("ConversionDaemon: monitor cancelled")
except Exception as e:
logger.exception("ConversionDaemon: monitor error: %s", e)
async def cleanup_completed(self) -> None:
"""
Периодическая очистка ресурсов (логи/временные файлы) по завершённым задачам.
"""
logger.info("ConversionDaemon: starting cleanup loop")
try:
while not self._shutdown.is_set():
# В этой версии упрощённо ничего не чистим, т.к. хранение файлов управляется извне.
# Точку расширения оставляем для будущего: удаление временных входных/выходных файлов.
await asyncio.sleep(self._cleanup_interval)
except asyncio.CancelledError:
logger.info("ConversionDaemon: cleanup cancelled")
except Exception as e:
logger.exception("ConversionDaemon: cleanup error: %s", e)
async def run(self) -> None:
"""
Запускает три корутины: планировщик, монитор, очистку.
"""
logger.info("ConversionDaemon: run()")
tasks = [
asyncio.create_task(self.process_queue()),
asyncio.create_task(self.monitor_conversions()),
asyncio.create_task(self.cleanup_completed()),
]
try:
await asyncio.gather(*tasks)
finally:
for t in tasks:
if not t.done():
t.cancel()
def stop(self) -> None:
"""
Инициирует завершение фоновых задач.
"""
logger.info("ConversionDaemon: stop() called")
self._shutdown.set()