# MY Network v3.0 - Converter Module и Docker Integration ## 🔄 Обзор Converter Module Converter Module представляет собой on-demand систему конвертации медиа файлов, работающую через Docker контейнеры. Модуль запускается только при необходимости обработки файлов и автоматически завершает работу после выполнения задачи. ## 🐳 Архитектура Docker Integration ### Принцип работы ```mermaid graph TB API[MY Network API] --> CM[Converter Manager] CM --> DS[Docker Socket] DS --> DI[Docker Image: my-converter] subgraph "On-Demand Containers" C1[Converter Container 1] C2[Converter Container 2] C3[Converter Container N] end DI -.->|Creates| C1 DI -.->|Creates| C2 DI -.->|Creates| C3 C1 --> FS[Shared File System] C2 --> FS C3 --> FS C1 -.->|Auto-Remove| X1[❌] C2 -.->|Auto-Remove| X2[❌] C3 -.->|Auto-Remove| X3[❌] ``` ### Docker Socket Integration ```python class DockerConverterManager: """Управление converter контейнерами через Docker API""" def __init__(self, docker_sock_path: str = "/var/run/docker.sock"): self.docker_sock_path = docker_sock_path self.docker_client = None self.converter_image = "my-converter:latest" self.max_parallel_jobs = 3 self.active_jobs = {} async def initialize(self): """Инициализация Docker клиента""" try: # Проверка доступности Docker socket if not os.path.exists(self.docker_sock_path): raise DockerError(f"Docker socket not found: {self.docker_sock_path}") # Создание Docker клиента self.docker_client = docker.DockerClient( base_url=f"unix://{self.docker_sock_path}" ) # Проверка связи с Docker await self._test_docker_connection() # Проверка наличия converter образа await self._ensure_converter_image() logger.info("Docker converter manager initialized successfully") except Exception as e: logger.error(f"Failed to initialize Docker converter manager: {e}") raise async def _test_docker_connection(self): """Проверка подключения к Docker""" try: info = self.docker_client.info() logger.info(f"Docker connected: {info['ServerVersion']}") except Exception as e: raise DockerError(f"Cannot connect to Docker: {e}") async def _ensure_converter_image(self): """Проверка наличия converter образа""" try: image = self.docker_client.images.get(self.converter_image) logger.info(f"Converter image found: {image.id[:12]}") except docker.errors.ImageNotFound: logger.error(f"Converter image not found: {self.converter_image}") raise DockerError(f"Please build converter image: {self.converter_image}") async def convert_file(self, input_file: str, output_file: str, conversion_type: str, options: dict = None) -> str: """ Запуск конвертации файла в новом контейнере Args: input_file: Путь к входному файлу output_file: Путь к выходному файлу conversion_type: Тип конвертации (video_to_mp4, audio_to_mp3, etc.) options: Дополнительные параметры конвертации Returns: str: ID задачи конвертации """ # Проверка лимита параллельных задач if len(self.active_jobs) >= self.max_parallel_jobs: raise ConverterError("Too many active conversion jobs") # Генерация ID задачи job_id = str(uuid4()) # Подготовка окружения для контейнера container_config = await self._prepare_container_config( job_id, input_file, output_file, conversion_type, options ) try: # Создание и запуск контейнера container = self.docker_client.containers.run( image=self.converter_image, command=container_config["command"], volumes=container_config["volumes"], environment=container_config["environment"], name=f"converter-{job_id}", detach=True, auto_remove=True, # Автоматическое удаление после завершения cpu_count=1, # Ограничение ресурсов mem_limit="1g" ) # Регистрация активной задачи self.active_jobs[job_id] = { "container": container, "start_time": datetime.utcnow(), "input_file": input_file, "output_file": output_file, "conversion_type": conversion_type, "status": "running" } logger.info(f"Conversion job {job_id} started in container {container.id[:12]}") # Запуск мониторинга задачи в фоне asyncio.create_task(self._monitor_conversion_job(job_id)) return job_id except Exception as e: logger.error(f"Failed to start conversion job {job_id}: {e}") raise ConverterError(f"Conversion start failed: {e}") async def _prepare_container_config(self, job_id: str, input_file: str, output_file: str, conversion_type: str, options: dict) -> dict: """Подготовка конфигурации контейнера""" # Общая папка для файлов между хостом и контейнером shared_volume = "/opt/my-network/shared" container_input = f"/input/{os.path.basename(input_file)}" container_output = f"/output/{os.path.basename(output_file)}" # Копирование входного файла в shared папку shared_input_path = f"{shared_volume}/input/{job_id}_{os.path.basename(input_file)}" os.makedirs(os.path.dirname(shared_input_path), exist_ok=True) shutil.copy2(input_file, shared_input_path) # Команда для конвертации command = self._build_conversion_command( container_input, container_output, conversion_type, options ) return { "command": command, "volumes": { shared_volume: { "bind": "/workspace", "mode": "rw" } }, "environment": { "JOB_ID": job_id, "CONVERSION_TYPE": conversion_type, "INPUT_FILE": container_input, "OUTPUT_FILE": container_output } } def _build_conversion_command(self, input_file: str, output_file: str, conversion_type: str, options: dict) -> List[str]: """Построение команды конвертации""" conversion_commands = { "video_to_mp4": [ "ffmpeg", "-i", input_file, "-c:v", "libx264", "-c:a", "aac", "-preset", "medium", "-crf", "23", output_file ], "audio_to_mp3": [ "ffmpeg", "-i", input_file, "-c:a", "libmp3lame", "-b:a", "192k", output_file ], "image_resize": [ "convert", input_file, "-resize", options.get("size", "800x600"), "-quality", str(options.get("quality", 85)), output_file ], "extract_thumbnail": [ "ffmpeg", "-i", input_file, "-ss", "00:00:01", "-frames:v", "1", "-q:v", "2", output_file ] } if conversion_type not in conversion_commands: raise ConverterError(f"Unsupported conversion type: {conversion_type}") return conversion_commands[conversion_type] async def _monitor_conversion_job(self, job_id: str): """Мониторинг выполнения задачи конвертации""" if job_id not in self.active_jobs: return job = self.active_jobs[job_id] container = job["container"] try: # Ожидание завершения контейнера result = container.wait(timeout=300) # 5 минут таймаут # Получение логов logs = container.logs().decode('utf-8') # Обновление статуса задачи if result["StatusCode"] == 0: job["status"] = "completed" job["end_time"] = datetime.utcnow() # Перемещение выходного файла await self._handle_conversion_success(job_id) logger.info(f"Conversion job {job_id} completed successfully") else: job["status"] = "failed" job["error"] = f"Container exited with code {result['StatusCode']}" job["logs"] = logs logger.error(f"Conversion job {job_id} failed: {job['error']}") except docker.errors.ContainerError as e: job["status"] = "failed" job["error"] = str(e) logger.error(f"Conversion job {job_id} container error: {e}") except Exception as e: job["status"] = "failed" job["error"] = str(e) logger.error(f"Conversion job {job_id} monitoring error: {e}") finally: # Очистка ресурсов await self._cleanup_conversion_job(job_id) async def _handle_conversion_success(self, job_id: str): """Обработка успешного завершения конвертации""" job = self.active_jobs[job_id] shared_volume = "/opt/my-network/shared" # Путь к выходному файлу в shared папке shared_output_path = f"{shared_volume}/output/{job_id}_{os.path.basename(job['output_file'])}" if os.path.exists(shared_output_path): # Перемещение файла в целевое расположение os.makedirs(os.path.dirname(job["output_file"]), exist_ok=True) shutil.move(shared_output_path, job["output_file"]) job["output_size"] = os.path.getsize(job["output_file"]) logger.info(f"Output file moved to: {job['output_file']}") else: job["status"] = "failed" job["error"] = "Output file not found after conversion" async def _cleanup_conversion_job(self, job_id: str): """Очистка ресурсов после завершения задачи""" if job_id not in self.active_jobs: return job = self.active_jobs[job_id] shared_volume = "/opt/my-network/shared" # Удаление временных файлов input_cleanup = f"{shared_volume}/input/{job_id}_*" output_cleanup = f"{shared_volume}/output/{job_id}_*" for pattern in [input_cleanup, output_cleanup]: for file_path in glob.glob(pattern): try: os.remove(file_path) except OSError: pass # Удаление задачи из активных (через некоторое время для логов) await asyncio.sleep(60) # Сохраняем информацию 1 минуту if job_id in self.active_jobs: del self.active_jobs[job_id] async def get_job_status(self, job_id: str) -> dict: """Получение статуса задачи конвертации""" if job_id not in self.active_jobs: return {"status": "not_found", "error": "Job not found"} job = self.active_jobs[job_id] status_info = { "job_id": job_id, "status": job["status"], "conversion_type": job["conversion_type"], "start_time": job["start_time"].isoformat(), "input_file": job["input_file"], "output_file": job["output_file"] } if "end_time" in job: status_info["end_time"] = job["end_time"].isoformat() duration = (job["end_time"] - job["start_time"]).total_seconds() status_info["duration_seconds"] = duration if "output_size" in job: status_info["output_size"] = job["output_size"] if "error" in job: status_info["error"] = job["error"] if "logs" in job: status_info["logs"] = job["logs"] return status_info async def cancel_job(self, job_id: str) -> bool: """Отмена задачи конвертации""" if job_id not in self.active_jobs: return False job = self.active_jobs[job_id] container = job["container"] try: # Остановка контейнера container.stop(timeout=10) job["status"] = "cancelled" job["end_time"] = datetime.utcnow() logger.info(f"Conversion job {job_id} cancelled") return True except Exception as e: logger.error(f"Failed to cancel job {job_id}: {e}") return False async def get_active_jobs(self) -> List[dict]: """Получение списка активных задач""" active_jobs = [] for job_id, job in self.active_jobs.items(): if job["status"] == "running": active_jobs.append({ "job_id": job_id, "conversion_type": job["conversion_type"], "start_time": job["start_time"].isoformat(), "duration_seconds": (datetime.utcnow() - job["start_time"]).total_seconds() }) return active_jobs ``` ## 🔧 Docker Compose конфигурация ### Обновленный docker-compose.yml ```yaml version: '3.8' services: app: build: context: . dockerfile: Dockerfile ports: - "15100:15100" volumes: # Доступ к Docker socket для управления converter контейнерами - ${DOCKER_SOCK_PATH:-/var/run/docker.sock}:/var/run/docker.sock # Общая папка для обмена файлами с converter контейнерами - ./shared:/opt/my-network/shared # Хранилище контента - ./storage:/opt/my-network/storage environment: - DOCKER_SOCK_PATH=/var/run/docker.sock - CONVERTER_ENABLED=true - CONVERTER_MAX_PARALLEL=3 depends_on: - postgres - redis networks: - my-network # Converter образ (только для сборки, не запускается) converter-module: build: context: ./converter-module dockerfile: Dockerfile image: my-converter:latest profiles: - build-only # Не запускается в обычном режиме postgres: image: postgres:15 environment: POSTGRES_DB: ${POSTGRES_DB} POSTGRES_USER: ${POSTGRES_USER} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} volumes: - postgres_data:/var/lib/postgresql/data networks: - my-network redis: image: redis:7-alpine networks: - my-network volumes: postgres_data: networks: my-network: driver: bridge ``` ## 📊 API для управления конвертацией ### REST API endpoints ```python @app.route('/api/v1/converter/convert', methods=['POST']) async def start_conversion(request): """Запуск задачи конвертации""" data = request.json required_fields = ['input_file', 'output_file', 'conversion_type'] for field in required_fields: if field not in data: return response.json({'error': f'Missing required field: {field}'}, status=400) try: job_id = await converter_manager.convert_file( input_file=data['input_file'], output_file=data['output_file'], conversion_type=data['conversion_type'], options=data.get('options', {}) ) return response.json({ 'job_id': job_id, 'status': 'started', 'message': 'Conversion job started successfully' }) except ConverterError as e: return response.json({'error': str(e)}, status=400) @app.route('/api/v1/converter/status/', methods=['GET']) async def get_conversion_status(request, job_id): """Получение статуса задачи конвертации""" status = await converter_manager.get_job_status(job_id) return response.json(status) @app.route('/api/v1/converter/cancel/', methods=['POST']) async def cancel_conversion(request, job_id): """Отмена задачи конвертации""" cancelled = await converter_manager.cancel_job(job_id) if cancelled: return response.json({'message': 'Job cancelled successfully'}) else: return response.json({'error': 'Failed to cancel job or job not found'}, status=404) @app.route('/api/v1/converter/active', methods=['GET']) async def get_active_conversions(request): """Список активных задач конвертации""" active_jobs = await converter_manager.get_active_jobs() return response.json({ 'active_jobs': active_jobs, 'total_active': len(active_jobs) }) @app.route('/api/v1/converter/stats', methods=['GET']) async def get_converter_stats(request): """Статистика работы конвертера""" return response.json({ 'max_parallel_jobs': converter_manager.max_parallel_jobs, 'active_jobs_count': len(converter_manager.active_jobs), 'docker_sock_path': converter_manager.docker_sock_path, 'converter_image': converter_manager.converter_image, 'supported_conversions': [ 'video_to_mp4', 'audio_to_mp3', 'image_resize', 'extract_thumbnail' ] }) ``` ## 🛠️ Converter Module Dockerfile ### converter-module/Dockerfile ```dockerfile FROM ubuntu:22.04 # Установка зависимостей RUN apt-get update && apt-get install -y \ ffmpeg \ imagemagick \ python3 \ python3-pip \ && rm -rf /var/lib/apt/lists/* # Установка Python зависимостей RUN pip3 install \ pillow \ opencv-python # Создание рабочей директории WORKDIR /workspace # Копирование скриптов конвертации COPY scripts/ /usr/local/bin/ RUN chmod +x /usr/local/bin/* # Создание директорий для файлов RUN mkdir -p /input /output # Точка входа ENTRYPOINT ["/usr/local/bin/convert.sh"] ``` ### converter-module/scripts/convert.sh ```bash #!/bin/bash set -e echo "Starting conversion job: $JOB_ID" echo "Conversion type: $CONVERSION_TYPE" echo "Input file: $INPUT_FILE" echo "Output file: $OUTPUT_FILE" # Функция логирования log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" } # Проверка входного файла if [ ! -f "/workspace$INPUT_FILE" ]; then log "ERROR: Input file not found: $INPUT_FILE" exit 1 fi # Создание выходной директории mkdir -p "$(dirname "/workspace$OUTPUT_FILE")" # Выполнение конвертации в зависимости от типа case "$CONVERSION_TYPE" in "video_to_mp4") log "Converting video to MP4..." ffmpeg -i "/workspace$INPUT_FILE" \ -c:v libx264 -c:a aac \ -preset medium -crf 23 \ "/workspace$OUTPUT_FILE" ;; "audio_to_mp3") log "Converting audio to MP3..." ffmpeg -i "/workspace$INPUT_FILE" \ -c:a libmp3lame -b:a 192k \ "/workspace$OUTPUT_FILE" ;; "image_resize") log "Resizing image..." convert "/workspace$INPUT_FILE" \ -resize 800x600 \ -quality 85 \ "/workspace$OUTPUT_FILE" ;; "extract_thumbnail") log "Extracting thumbnail..." ffmpeg -i "/workspace$INPUT_FILE" \ -ss 00:00:01 -frames:v 1 \ -q:v 2 \ "/workspace$OUTPUT_FILE" ;; *) log "ERROR: Unsupported conversion type: $CONVERSION_TYPE" exit 1 ;; esac # Проверка результата if [ -f "/workspace$OUTPUT_FILE" ]; then log "Conversion completed successfully" log "Output file size: $(du -h "/workspace$OUTPUT_FILE" | cut -f1)" exit 0 else log "ERROR: Output file was not created" exit 1 fi ``` ## 🔍 Мониторинг и логирование ### Логирование converter операций ```python class ConverterLogger: """Специализированная система логирования для converter операций""" def __init__(self): self.logger = logging.getLogger('converter') self.converter_metrics = { 'total_jobs': 0, 'successful_jobs': 0, 'failed_jobs': 0, 'total_processing_time': 0 } def log_job_start(self, job_id: str, conversion_type: str, input_file: str): """Логирование начала задачи""" self.logger.info({ 'event': 'job_started', 'job_id': job_id, 'conversion_type': conversion_type, 'input_file': os.path.basename(input_file), 'timestamp': datetime.utcnow().isoformat() }) self.converter_metrics['total_jobs'] += 1 def log_job_completion(self, job_id: str, success: bool, duration: float, output_size: int = None): """Логирование завершения задачи""" if success: self.converter_metrics['successful_jobs'] += 1 event = 'job_completed' else: self.converter_metrics['failed_jobs'] += 1 event = 'job_failed' self.converter_metrics['total_processing_time'] += duration log_data = { 'event': event, 'job_id': job_id, 'duration_seconds': duration, 'timestamp': datetime.utcnow().isoformat() } if output_size: log_data['output_size_bytes'] = output_size self.logger.info(log_data) def get_metrics(self) -> dict: """Получение метрик converter модуля""" total_jobs = self.converter_metrics['total_jobs'] return { 'total_jobs': total_jobs, 'successful_jobs': self.converter_metrics['successful_jobs'], 'failed_jobs': self.converter_metrics['failed_jobs'], 'success_rate': self.converter_metrics['successful_jobs'] / total_jobs if total_jobs > 0 else 0, 'average_processing_time': self.converter_metrics['total_processing_time'] / total_jobs if total_jobs > 0 else 0 } ``` ## 🚀 Интеграция с MY Network ### Автоматическая конвертация при загрузке контента ```python class ContentProcessingPipeline: """Конвейер обработки контента с автоматической конвертацией""" async def process_uploaded_content(self, content: StoredContent): """Обработка загруженного контента""" # Определение типа контента и необходимых конвертаций conversions_needed = self._determine_conversions(content) for conversion in conversions_needed: try: job_id = await converter_manager.convert_file( input_file=content.file_path, output_file=conversion['output_path'], conversion_type=conversion['type'], options=conversion.get('options', {}) ) # Сохранение связи между контентом и задачей конвертации await self._link_content_conversion(content.id, job_id, conversion['type']) except ConverterError as e: logger.error(f"Failed to start conversion for content {content.id}: {e}") def _determine_conversions(self, content: StoredContent) -> List[dict]: """Определение необходимых конвертаций""" conversions = [] if content.content_type.startswith('video/'): # Для видео создаем preview и thumbnail conversions.extend([ { 'type': 'video_to_mp4', 'output_path': content.file_path.replace('.', '_preview.') + 'mp4', 'options': {'quality': 'medium', 'resolution': '720p'} }, { 'type': 'extract_thumbnail', 'output_path': content.file_path.replace('.', '_thumb.') + 'jpg' } ]) elif content.content_type.startswith('audio/'): # Для аудио создаем preview версию conversions.append({ 'type': 'audio_to_mp3', 'output_path': content.file_path.replace('.', '_preview.') + 'mp3', 'options': {'bitrate': '128k', 'duration': '30'} # 30 секунд preview }) elif content.content_type.startswith('image/'): # Для изображений создаем thumbnail conversions.append({ 'type': 'image_resize', 'output_path': content.file_path.replace('.', '_thumb.') + 'jpg', 'options': {'size': '300x300', 'quality': 80} }) return conversions ``` Эта система обеспечивает гибкую и эффективную обработку медиа файлов с полной интеграцией в MY Network архитектуру.