uploader-bot/docs/CONVERTER_MODULE.md

28 KiB
Raw Permalink Blame History

MY Network v3.0 - Converter Module и Docker Integration

🔄 Обзор Converter Module

Converter Module представляет собой on-demand систему конвертации медиа файлов, работающую через Docker контейнеры. Модуль запускается только при необходимости обработки файлов и автоматически завершает работу после выполнения задачи.

🐳 Архитектура Docker Integration

Принцип работы

graph TB
    API[MY Network API] --> CM[Converter Manager]
    CM --> DS[Docker Socket]
    DS --> DI[Docker Image: my-converter]
    
    subgraph "On-Demand Containers"
        C1[Converter Container 1]
        C2[Converter Container 2]
        C3[Converter Container N]
    end
    
    DI -.->|Creates| C1
    DI -.->|Creates| C2
    DI -.->|Creates| C3
    
    C1 --> FS[Shared File System]
    C2 --> FS
    C3 --> FS
    
    C1 -.->|Auto-Remove| X1[❌]
    C2 -.->|Auto-Remove| X2[❌]
    C3 -.->|Auto-Remove| X3[❌]

Docker Socket Integration

class DockerConverterManager:
    """Управление converter контейнерами через Docker API"""
    
    def __init__(self, docker_sock_path: str = "/var/run/docker.sock"):
        self.docker_sock_path = docker_sock_path
        self.docker_client = None
        self.converter_image = "my-converter:latest"
        self.max_parallel_jobs = 3
        self.active_jobs = {}
        
    async def initialize(self):
        """Инициализация Docker клиента"""
        try:
            # Проверка доступности Docker socket
            if not os.path.exists(self.docker_sock_path):
                raise DockerError(f"Docker socket not found: {self.docker_sock_path}")
            
            # Создание Docker клиента
            self.docker_client = docker.DockerClient(
                base_url=f"unix://{self.docker_sock_path}"
            )
            
            # Проверка связи с Docker
            await self._test_docker_connection()
            
            # Проверка наличия converter образа
            await self._ensure_converter_image()
            
            logger.info("Docker converter manager initialized successfully")
            
        except Exception as e:
            logger.error(f"Failed to initialize Docker converter manager: {e}")
            raise
    
    async def _test_docker_connection(self):
        """Проверка подключения к Docker"""
        try:
            info = self.docker_client.info()
            logger.info(f"Docker connected: {info['ServerVersion']}")
        except Exception as e:
            raise DockerError(f"Cannot connect to Docker: {e}")
    
    async def _ensure_converter_image(self):
        """Проверка наличия converter образа"""
        try:
            image = self.docker_client.images.get(self.converter_image)
            logger.info(f"Converter image found: {image.id[:12]}")
        except docker.errors.ImageNotFound:
            logger.error(f"Converter image not found: {self.converter_image}")
            raise DockerError(f"Please build converter image: {self.converter_image}")
    
    async def convert_file(self, input_file: str, output_file: str, 
                          conversion_type: str, options: dict = None) -> str:
        """
        Запуск конвертации файла в новом контейнере
        
        Args:
            input_file: Путь к входному файлу
            output_file: Путь к выходному файлу  
            conversion_type: Тип конвертации (video_to_mp4, audio_to_mp3, etc.)
            options: Дополнительные параметры конвертации
            
        Returns:
            str: ID задачи конвертации
        """
        
        # Проверка лимита параллельных задач
        if len(self.active_jobs) >= self.max_parallel_jobs:
            raise ConverterError("Too many active conversion jobs")
        
        # Генерация ID задачи
        job_id = str(uuid4())
        
        # Подготовка окружения для контейнера
        container_config = await self._prepare_container_config(
            job_id, input_file, output_file, conversion_type, options
        )
        
        try:
            # Создание и запуск контейнера
            container = self.docker_client.containers.run(
                image=self.converter_image,
                command=container_config["command"],
                volumes=container_config["volumes"],
                environment=container_config["environment"],
                name=f"converter-{job_id}",
                detach=True,
                auto_remove=True,  # Автоматическое удаление после завершения
                cpu_count=1,       # Ограничение ресурсов
                mem_limit="1g"
            )
            
            # Регистрация активной задачи
            self.active_jobs[job_id] = {
                "container": container,
                "start_time": datetime.utcnow(),
                "input_file": input_file,
                "output_file": output_file,
                "conversion_type": conversion_type,
                "status": "running"
            }
            
            logger.info(f"Conversion job {job_id} started in container {container.id[:12]}")
            
            # Запуск мониторинга задачи в фоне
            asyncio.create_task(self._monitor_conversion_job(job_id))
            
            return job_id
            
        except Exception as e:
            logger.error(f"Failed to start conversion job {job_id}: {e}")
            raise ConverterError(f"Conversion start failed: {e}")
    
    async def _prepare_container_config(self, job_id: str, input_file: str, 
                                       output_file: str, conversion_type: str, 
                                       options: dict) -> dict:
        """Подготовка конфигурации контейнера"""
        
        # Общая папка для файлов между хостом и контейнером
        shared_volume = "/opt/my-network/shared"
        container_input = f"/input/{os.path.basename(input_file)}"
        container_output = f"/output/{os.path.basename(output_file)}"
        
        # Копирование входного файла в shared папку
        shared_input_path = f"{shared_volume}/input/{job_id}_{os.path.basename(input_file)}"
        os.makedirs(os.path.dirname(shared_input_path), exist_ok=True)
        shutil.copy2(input_file, shared_input_path)
        
        # Команда для конвертации
        command = self._build_conversion_command(
            container_input, container_output, conversion_type, options
        )
        
        return {
            "command": command,
            "volumes": {
                shared_volume: {
                    "bind": "/workspace",
                    "mode": "rw"
                }
            },
            "environment": {
                "JOB_ID": job_id,
                "CONVERSION_TYPE": conversion_type,
                "INPUT_FILE": container_input,
                "OUTPUT_FILE": container_output
            }
        }
    
    def _build_conversion_command(self, input_file: str, output_file: str,
                                 conversion_type: str, options: dict) -> List[str]:
        """Построение команды конвертации"""
        
        conversion_commands = {
            "video_to_mp4": [
                "ffmpeg", "-i", input_file,
                "-c:v", "libx264", "-c:a", "aac",
                "-preset", "medium", "-crf", "23",
                output_file
            ],
            "audio_to_mp3": [
                "ffmpeg", "-i", input_file,
                "-c:a", "libmp3lame", "-b:a", "192k",
                output_file
            ],
            "image_resize": [
                "convert", input_file,
                "-resize", options.get("size", "800x600"),
                "-quality", str(options.get("quality", 85)),
                output_file
            ],
            "extract_thumbnail": [
                "ffmpeg", "-i", input_file,
                "-ss", "00:00:01", "-frames:v", "1",
                "-q:v", "2", output_file
            ]
        }
        
        if conversion_type not in conversion_commands:
            raise ConverterError(f"Unsupported conversion type: {conversion_type}")
        
        return conversion_commands[conversion_type]
    
    async def _monitor_conversion_job(self, job_id: str):
        """Мониторинг выполнения задачи конвертации"""
        
        if job_id not in self.active_jobs:
            return
        
        job = self.active_jobs[job_id]
        container = job["container"]
        
        try:
            # Ожидание завершения контейнера
            result = container.wait(timeout=300)  # 5 минут таймаут
            
            # Получение логов
            logs = container.logs().decode('utf-8')
            
            # Обновление статуса задачи
            if result["StatusCode"] == 0:
                job["status"] = "completed"
                job["end_time"] = datetime.utcnow()
                
                # Перемещение выходного файла
                await self._handle_conversion_success(job_id)
                
                logger.info(f"Conversion job {job_id} completed successfully")
            else:
                job["status"] = "failed"
                job["error"] = f"Container exited with code {result['StatusCode']}"
                job["logs"] = logs
                
                logger.error(f"Conversion job {job_id} failed: {job['error']}")
            
        except docker.errors.ContainerError as e:
            job["status"] = "failed"
            job["error"] = str(e)
            logger.error(f"Conversion job {job_id} container error: {e}")
            
        except Exception as e:
            job["status"] = "failed"
            job["error"] = str(e)
            logger.error(f"Conversion job {job_id} monitoring error: {e}")
        
        finally:
            # Очистка ресурсов
            await self._cleanup_conversion_job(job_id)
    
    async def _handle_conversion_success(self, job_id: str):
        """Обработка успешного завершения конвертации"""
        
        job = self.active_jobs[job_id]
        shared_volume = "/opt/my-network/shared"
        
        # Путь к выходному файлу в shared папке
        shared_output_path = f"{shared_volume}/output/{job_id}_{os.path.basename(job['output_file'])}"
        
        if os.path.exists(shared_output_path):
            # Перемещение файла в целевое расположение
            os.makedirs(os.path.dirname(job["output_file"]), exist_ok=True)
            shutil.move(shared_output_path, job["output_file"])
            
            job["output_size"] = os.path.getsize(job["output_file"])
            logger.info(f"Output file moved to: {job['output_file']}")
        else:
            job["status"] = "failed"
            job["error"] = "Output file not found after conversion"
    
    async def _cleanup_conversion_job(self, job_id: str):
        """Очистка ресурсов после завершения задачи"""
        
        if job_id not in self.active_jobs:
            return
        
        job = self.active_jobs[job_id]
        shared_volume = "/opt/my-network/shared"
        
        # Удаление временных файлов
        input_cleanup = f"{shared_volume}/input/{job_id}_*"
        output_cleanup = f"{shared_volume}/output/{job_id}_*"
        
        for pattern in [input_cleanup, output_cleanup]:
            for file_path in glob.glob(pattern):
                try:
                    os.remove(file_path)
                except OSError:
                    pass
        
        # Удаление задачи из активных (через некоторое время для логов)
        await asyncio.sleep(60)  # Сохраняем информацию 1 минуту
        if job_id in self.active_jobs:
            del self.active_jobs[job_id]
    
    async def get_job_status(self, job_id: str) -> dict:
        """Получение статуса задачи конвертации"""
        
        if job_id not in self.active_jobs:
            return {"status": "not_found", "error": "Job not found"}
        
        job = self.active_jobs[job_id]
        
        status_info = {
            "job_id": job_id,
            "status": job["status"],
            "conversion_type": job["conversion_type"],
            "start_time": job["start_time"].isoformat(),
            "input_file": job["input_file"],
            "output_file": job["output_file"]
        }
        
        if "end_time" in job:
            status_info["end_time"] = job["end_time"].isoformat()
            duration = (job["end_time"] - job["start_time"]).total_seconds()
            status_info["duration_seconds"] = duration
        
        if "output_size" in job:
            status_info["output_size"] = job["output_size"]
        
        if "error" in job:
            status_info["error"] = job["error"]
        
        if "logs" in job:
            status_info["logs"] = job["logs"]
        
        return status_info
    
    async def cancel_job(self, job_id: str) -> bool:
        """Отмена задачи конвертации"""
        
        if job_id not in self.active_jobs:
            return False
        
        job = self.active_jobs[job_id]
        container = job["container"]
        
        try:
            # Остановка контейнера
            container.stop(timeout=10)
            job["status"] = "cancelled"
            job["end_time"] = datetime.utcnow()
            
            logger.info(f"Conversion job {job_id} cancelled")
            return True
            
        except Exception as e:
            logger.error(f"Failed to cancel job {job_id}: {e}")
            return False
    
    async def get_active_jobs(self) -> List[dict]:
        """Получение списка активных задач"""
        
        active_jobs = []
        
        for job_id, job in self.active_jobs.items():
            if job["status"] == "running":
                active_jobs.append({
                    "job_id": job_id,
                    "conversion_type": job["conversion_type"],
                    "start_time": job["start_time"].isoformat(),
                    "duration_seconds": (datetime.utcnow() - job["start_time"]).total_seconds()
                })
        
        return active_jobs

🔧 Docker Compose конфигурация

Обновленный docker-compose.yml

version: '3.8'

services:
  app:
    build:
      context: .
      dockerfile: Dockerfile
    ports:
      - "15100:15100"
    volumes:
      # Доступ к Docker socket для управления converter контейнерами
      - ${DOCKER_SOCK_PATH:-/var/run/docker.sock}:/var/run/docker.sock
      # Общая папка для обмена файлами с converter контейнерами
      - ./shared:/opt/my-network/shared
      # Хранилище контента
      - ./storage:/opt/my-network/storage
    environment:
      - DOCKER_SOCK_PATH=/var/run/docker.sock
      - CONVERTER_ENABLED=true
      - CONVERTER_MAX_PARALLEL=3
    depends_on:
      - postgres
      - redis
    networks:
      - my-network

  # Converter образ (только для сборки, не запускается)
  converter-module:
    build:
      context: ./converter-module
      dockerfile: Dockerfile
    image: my-converter:latest
    profiles:
      - build-only  # Не запускается в обычном режиме

  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: ${POSTGRES_DB}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data
    networks:
      - my-network

  redis:
    image: redis:7-alpine
    networks:
      - my-network

volumes:
  postgres_data:

networks:
  my-network:
    driver: bridge

📊 API для управления конвертацией

REST API endpoints

@app.route('/api/v1/converter/convert', methods=['POST'])
async def start_conversion(request):
    """Запуск задачи конвертации"""
    
    data = request.json
    
    required_fields = ['input_file', 'output_file', 'conversion_type']
    for field in required_fields:
        if field not in data:
            return response.json({'error': f'Missing required field: {field}'}, status=400)
    
    try:
        job_id = await converter_manager.convert_file(
            input_file=data['input_file'],
            output_file=data['output_file'],
            conversion_type=data['conversion_type'],
            options=data.get('options', {})
        )
        
        return response.json({
            'job_id': job_id,
            'status': 'started',
            'message': 'Conversion job started successfully'
        })
        
    except ConverterError as e:
        return response.json({'error': str(e)}, status=400)

@app.route('/api/v1/converter/status/<job_id>', methods=['GET'])
async def get_conversion_status(request, job_id):
    """Получение статуса задачи конвертации"""
    
    status = await converter_manager.get_job_status(job_id)
    return response.json(status)

@app.route('/api/v1/converter/cancel/<job_id>', methods=['POST'])
async def cancel_conversion(request, job_id):
    """Отмена задачи конвертации"""
    
    cancelled = await converter_manager.cancel_job(job_id)
    
    if cancelled:
        return response.json({'message': 'Job cancelled successfully'})
    else:
        return response.json({'error': 'Failed to cancel job or job not found'}, status=404)

@app.route('/api/v1/converter/active', methods=['GET'])
async def get_active_conversions(request):
    """Список активных задач конвертации"""
    
    active_jobs = await converter_manager.get_active_jobs()
    return response.json({
        'active_jobs': active_jobs,
        'total_active': len(active_jobs)
    })

@app.route('/api/v1/converter/stats', methods=['GET'])
async def get_converter_stats(request):
    """Статистика работы конвертера"""
    
    return response.json({
        'max_parallel_jobs': converter_manager.max_parallel_jobs,
        'active_jobs_count': len(converter_manager.active_jobs),
        'docker_sock_path': converter_manager.docker_sock_path,
        'converter_image': converter_manager.converter_image,
        'supported_conversions': [
            'video_to_mp4',
            'audio_to_mp3', 
            'image_resize',
            'extract_thumbnail'
        ]
    })

🛠️ Converter Module Dockerfile

converter-module/Dockerfile

FROM ubuntu:22.04

# Установка зависимостей
RUN apt-get update && apt-get install -y \
    ffmpeg \
    imagemagick \
    python3 \
    python3-pip \
    && rm -rf /var/lib/apt/lists/*

# Установка Python зависимостей
RUN pip3 install \
    pillow \
    opencv-python

# Создание рабочей директории
WORKDIR /workspace

# Копирование скриптов конвертации
COPY scripts/ /usr/local/bin/
RUN chmod +x /usr/local/bin/*

# Создание директорий для файлов
RUN mkdir -p /input /output

# Точка входа
ENTRYPOINT ["/usr/local/bin/convert.sh"]

converter-module/scripts/convert.sh

#!/bin/bash

set -e

echo "Starting conversion job: $JOB_ID"
echo "Conversion type: $CONVERSION_TYPE"
echo "Input file: $INPUT_FILE"
echo "Output file: $OUTPUT_FILE"

# Функция логирования
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}

# Проверка входного файла
if [ ! -f "/workspace$INPUT_FILE" ]; then
    log "ERROR: Input file not found: $INPUT_FILE"
    exit 1
fi

# Создание выходной директории
mkdir -p "$(dirname "/workspace$OUTPUT_FILE")"

# Выполнение конвертации в зависимости от типа
case "$CONVERSION_TYPE" in
    "video_to_mp4")
        log "Converting video to MP4..."
        ffmpeg -i "/workspace$INPUT_FILE" \
               -c:v libx264 -c:a aac \
               -preset medium -crf 23 \
               "/workspace$OUTPUT_FILE"
        ;;
    
    "audio_to_mp3")
        log "Converting audio to MP3..."
        ffmpeg -i "/workspace$INPUT_FILE" \
               -c:a libmp3lame -b:a 192k \
               "/workspace$OUTPUT_FILE"
        ;;
    
    "image_resize")
        log "Resizing image..."
        convert "/workspace$INPUT_FILE" \
                -resize 800x600 \
                -quality 85 \
                "/workspace$OUTPUT_FILE"
        ;;
    
    "extract_thumbnail")
        log "Extracting thumbnail..."
        ffmpeg -i "/workspace$INPUT_FILE" \
               -ss 00:00:01 -frames:v 1 \
               -q:v 2 \
               "/workspace$OUTPUT_FILE"
        ;;
    
    *)
        log "ERROR: Unsupported conversion type: $CONVERSION_TYPE"
        exit 1
        ;;
esac

# Проверка результата
if [ -f "/workspace$OUTPUT_FILE" ]; then
    log "Conversion completed successfully"
    log "Output file size: $(du -h "/workspace$OUTPUT_FILE" | cut -f1)"
    exit 0
else
    log "ERROR: Output file was not created"
    exit 1
fi

🔍 Мониторинг и логирование

Логирование converter операций

class ConverterLogger:
    """Специализированная система логирования для converter операций"""
    
    def __init__(self):
        self.logger = logging.getLogger('converter')
        self.converter_metrics = {
            'total_jobs': 0,
            'successful_jobs': 0,
            'failed_jobs': 0,
            'total_processing_time': 0
        }
    
    def log_job_start(self, job_id: str, conversion_type: str, input_file: str):
        """Логирование начала задачи"""
        self.logger.info({
            'event': 'job_started',
            'job_id': job_id,
            'conversion_type': conversion_type,
            'input_file': os.path.basename(input_file),
            'timestamp': datetime.utcnow().isoformat()
        })
        self.converter_metrics['total_jobs'] += 1
    
    def log_job_completion(self, job_id: str, success: bool, duration: float, output_size: int = None):
        """Логирование завершения задачи"""
        
        if success:
            self.converter_metrics['successful_jobs'] += 1
            event = 'job_completed'
        else:
            self.converter_metrics['failed_jobs'] += 1
            event = 'job_failed'
        
        self.converter_metrics['total_processing_time'] += duration
        
        log_data = {
            'event': event,
            'job_id': job_id,
            'duration_seconds': duration,
            'timestamp': datetime.utcnow().isoformat()
        }
        
        if output_size:
            log_data['output_size_bytes'] = output_size
        
        self.logger.info(log_data)
    
    def get_metrics(self) -> dict:
        """Получение метрик converter модуля"""
        
        total_jobs = self.converter_metrics['total_jobs']
        
        return {
            'total_jobs': total_jobs,
            'successful_jobs': self.converter_metrics['successful_jobs'],
            'failed_jobs': self.converter_metrics['failed_jobs'],
            'success_rate': self.converter_metrics['successful_jobs'] / total_jobs if total_jobs > 0 else 0,
            'average_processing_time': self.converter_metrics['total_processing_time'] / total_jobs if total_jobs > 0 else 0
        }

🚀 Интеграция с MY Network

Автоматическая конвертация при загрузке контента

class ContentProcessingPipeline:
    """Конвейер обработки контента с автоматической конвертацией"""
    
    async def process_uploaded_content(self, content: StoredContent):
        """Обработка загруженного контента"""
        
        # Определение типа контента и необходимых конвертаций
        conversions_needed = self._determine_conversions(content)
        
        for conversion in conversions_needed:
            try:
                job_id = await converter_manager.convert_file(
                    input_file=content.file_path,
                    output_file=conversion['output_path'],
                    conversion_type=conversion['type'],
                    options=conversion.get('options', {})
                )
                
                # Сохранение связи между контентом и задачей конвертации
                await self._link_content_conversion(content.id, job_id, conversion['type'])
                
            except ConverterError as e:
                logger.error(f"Failed to start conversion for content {content.id}: {e}")
    
    def _determine_conversions(self, content: StoredContent) -> List[dict]:
        """Определение необходимых конвертаций"""
        
        conversions = []
        
        if content.content_type.startswith('video/'):
            # Для видео создаем preview и thumbnail
            conversions.extend([
                {
                    'type': 'video_to_mp4',
                    'output_path': content.file_path.replace('.', '_preview.') + 'mp4',
                    'options': {'quality': 'medium', 'resolution': '720p'}
                },
                {
                    'type': 'extract_thumbnail',
                    'output_path': content.file_path.replace('.', '_thumb.') + 'jpg'
                }
            ])
        
        elif content.content_type.startswith('audio/'):
            # Для аудио создаем preview версию
            conversions.append({
                'type': 'audio_to_mp3',
                'output_path': content.file_path.replace('.', '_preview.') + 'mp3',
                'options': {'bitrate': '128k', 'duration': '30'}  # 30 секунд preview
            })
        
        elif content.content_type.startswith('image/'):
            # Для изображений создаем thumbnail
            conversions.append({
                'type': 'image_resize',
                'output_path': content.file_path.replace('.', '_thumb.') + 'jpg',
                'options': {'size': '300x300', 'quality': 80}
            })
        
        return conversions

Эта система обеспечивает гибкую и эффективную обработку медиа файлов с полной интеграцией в MY Network архитектуру.