28 KiB
28 KiB
MY Network v3.0 - Converter Module и Docker Integration
🔄 Обзор Converter Module
Converter Module представляет собой on-demand систему конвертации медиа файлов, работающую через Docker контейнеры. Модуль запускается только при необходимости обработки файлов и автоматически завершает работу после выполнения задачи.
🐳 Архитектура Docker Integration
Принцип работы
graph TB
API[MY Network API] --> CM[Converter Manager]
CM --> DS[Docker Socket]
DS --> DI[Docker Image: my-converter]
subgraph "On-Demand Containers"
C1[Converter Container 1]
C2[Converter Container 2]
C3[Converter Container N]
end
DI -.->|Creates| C1
DI -.->|Creates| C2
DI -.->|Creates| C3
C1 --> FS[Shared File System]
C2 --> FS
C3 --> FS
C1 -.->|Auto-Remove| X1[❌]
C2 -.->|Auto-Remove| X2[❌]
C3 -.->|Auto-Remove| X3[❌]
Docker Socket Integration
class DockerConverterManager:
"""Управление converter контейнерами через Docker API"""
def __init__(self, docker_sock_path: str = "/var/run/docker.sock"):
self.docker_sock_path = docker_sock_path
self.docker_client = None
self.converter_image = "my-converter:latest"
self.max_parallel_jobs = 3
self.active_jobs = {}
async def initialize(self):
"""Инициализация Docker клиента"""
try:
# Проверка доступности Docker socket
if not os.path.exists(self.docker_sock_path):
raise DockerError(f"Docker socket not found: {self.docker_sock_path}")
# Создание Docker клиента
self.docker_client = docker.DockerClient(
base_url=f"unix://{self.docker_sock_path}"
)
# Проверка связи с Docker
await self._test_docker_connection()
# Проверка наличия converter образа
await self._ensure_converter_image()
logger.info("Docker converter manager initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Docker converter manager: {e}")
raise
async def _test_docker_connection(self):
"""Проверка подключения к Docker"""
try:
info = self.docker_client.info()
logger.info(f"Docker connected: {info['ServerVersion']}")
except Exception as e:
raise DockerError(f"Cannot connect to Docker: {e}")
async def _ensure_converter_image(self):
"""Проверка наличия converter образа"""
try:
image = self.docker_client.images.get(self.converter_image)
logger.info(f"Converter image found: {image.id[:12]}")
except docker.errors.ImageNotFound:
logger.error(f"Converter image not found: {self.converter_image}")
raise DockerError(f"Please build converter image: {self.converter_image}")
async def convert_file(self, input_file: str, output_file: str,
conversion_type: str, options: dict = None) -> str:
"""
Запуск конвертации файла в новом контейнере
Args:
input_file: Путь к входному файлу
output_file: Путь к выходному файлу
conversion_type: Тип конвертации (video_to_mp4, audio_to_mp3, etc.)
options: Дополнительные параметры конвертации
Returns:
str: ID задачи конвертации
"""
# Проверка лимита параллельных задач
if len(self.active_jobs) >= self.max_parallel_jobs:
raise ConverterError("Too many active conversion jobs")
# Генерация ID задачи
job_id = str(uuid4())
# Подготовка окружения для контейнера
container_config = await self._prepare_container_config(
job_id, input_file, output_file, conversion_type, options
)
try:
# Создание и запуск контейнера
container = self.docker_client.containers.run(
image=self.converter_image,
command=container_config["command"],
volumes=container_config["volumes"],
environment=container_config["environment"],
name=f"converter-{job_id}",
detach=True,
auto_remove=True, # Автоматическое удаление после завершения
cpu_count=1, # Ограничение ресурсов
mem_limit="1g"
)
# Регистрация активной задачи
self.active_jobs[job_id] = {
"container": container,
"start_time": datetime.utcnow(),
"input_file": input_file,
"output_file": output_file,
"conversion_type": conversion_type,
"status": "running"
}
logger.info(f"Conversion job {job_id} started in container {container.id[:12]}")
# Запуск мониторинга задачи в фоне
asyncio.create_task(self._monitor_conversion_job(job_id))
return job_id
except Exception as e:
logger.error(f"Failed to start conversion job {job_id}: {e}")
raise ConverterError(f"Conversion start failed: {e}")
async def _prepare_container_config(self, job_id: str, input_file: str,
output_file: str, conversion_type: str,
options: dict) -> dict:
"""Подготовка конфигурации контейнера"""
# Общая папка для файлов между хостом и контейнером
shared_volume = "/opt/my-network/shared"
container_input = f"/input/{os.path.basename(input_file)}"
container_output = f"/output/{os.path.basename(output_file)}"
# Копирование входного файла в shared папку
shared_input_path = f"{shared_volume}/input/{job_id}_{os.path.basename(input_file)}"
os.makedirs(os.path.dirname(shared_input_path), exist_ok=True)
shutil.copy2(input_file, shared_input_path)
# Команда для конвертации
command = self._build_conversion_command(
container_input, container_output, conversion_type, options
)
return {
"command": command,
"volumes": {
shared_volume: {
"bind": "/workspace",
"mode": "rw"
}
},
"environment": {
"JOB_ID": job_id,
"CONVERSION_TYPE": conversion_type,
"INPUT_FILE": container_input,
"OUTPUT_FILE": container_output
}
}
def _build_conversion_command(self, input_file: str, output_file: str,
conversion_type: str, options: dict) -> List[str]:
"""Построение команды конвертации"""
conversion_commands = {
"video_to_mp4": [
"ffmpeg", "-i", input_file,
"-c:v", "libx264", "-c:a", "aac",
"-preset", "medium", "-crf", "23",
output_file
],
"audio_to_mp3": [
"ffmpeg", "-i", input_file,
"-c:a", "libmp3lame", "-b:a", "192k",
output_file
],
"image_resize": [
"convert", input_file,
"-resize", options.get("size", "800x600"),
"-quality", str(options.get("quality", 85)),
output_file
],
"extract_thumbnail": [
"ffmpeg", "-i", input_file,
"-ss", "00:00:01", "-frames:v", "1",
"-q:v", "2", output_file
]
}
if conversion_type not in conversion_commands:
raise ConverterError(f"Unsupported conversion type: {conversion_type}")
return conversion_commands[conversion_type]
async def _monitor_conversion_job(self, job_id: str):
"""Мониторинг выполнения задачи конвертации"""
if job_id not in self.active_jobs:
return
job = self.active_jobs[job_id]
container = job["container"]
try:
# Ожидание завершения контейнера
result = container.wait(timeout=300) # 5 минут таймаут
# Получение логов
logs = container.logs().decode('utf-8')
# Обновление статуса задачи
if result["StatusCode"] == 0:
job["status"] = "completed"
job["end_time"] = datetime.utcnow()
# Перемещение выходного файла
await self._handle_conversion_success(job_id)
logger.info(f"Conversion job {job_id} completed successfully")
else:
job["status"] = "failed"
job["error"] = f"Container exited with code {result['StatusCode']}"
job["logs"] = logs
logger.error(f"Conversion job {job_id} failed: {job['error']}")
except docker.errors.ContainerError as e:
job["status"] = "failed"
job["error"] = str(e)
logger.error(f"Conversion job {job_id} container error: {e}")
except Exception as e:
job["status"] = "failed"
job["error"] = str(e)
logger.error(f"Conversion job {job_id} monitoring error: {e}")
finally:
# Очистка ресурсов
await self._cleanup_conversion_job(job_id)
async def _handle_conversion_success(self, job_id: str):
"""Обработка успешного завершения конвертации"""
job = self.active_jobs[job_id]
shared_volume = "/opt/my-network/shared"
# Путь к выходному файлу в shared папке
shared_output_path = f"{shared_volume}/output/{job_id}_{os.path.basename(job['output_file'])}"
if os.path.exists(shared_output_path):
# Перемещение файла в целевое расположение
os.makedirs(os.path.dirname(job["output_file"]), exist_ok=True)
shutil.move(shared_output_path, job["output_file"])
job["output_size"] = os.path.getsize(job["output_file"])
logger.info(f"Output file moved to: {job['output_file']}")
else:
job["status"] = "failed"
job["error"] = "Output file not found after conversion"
async def _cleanup_conversion_job(self, job_id: str):
"""Очистка ресурсов после завершения задачи"""
if job_id not in self.active_jobs:
return
job = self.active_jobs[job_id]
shared_volume = "/opt/my-network/shared"
# Удаление временных файлов
input_cleanup = f"{shared_volume}/input/{job_id}_*"
output_cleanup = f"{shared_volume}/output/{job_id}_*"
for pattern in [input_cleanup, output_cleanup]:
for file_path in glob.glob(pattern):
try:
os.remove(file_path)
except OSError:
pass
# Удаление задачи из активных (через некоторое время для логов)
await asyncio.sleep(60) # Сохраняем информацию 1 минуту
if job_id in self.active_jobs:
del self.active_jobs[job_id]
async def get_job_status(self, job_id: str) -> dict:
"""Получение статуса задачи конвертации"""
if job_id not in self.active_jobs:
return {"status": "not_found", "error": "Job not found"}
job = self.active_jobs[job_id]
status_info = {
"job_id": job_id,
"status": job["status"],
"conversion_type": job["conversion_type"],
"start_time": job["start_time"].isoformat(),
"input_file": job["input_file"],
"output_file": job["output_file"]
}
if "end_time" in job:
status_info["end_time"] = job["end_time"].isoformat()
duration = (job["end_time"] - job["start_time"]).total_seconds()
status_info["duration_seconds"] = duration
if "output_size" in job:
status_info["output_size"] = job["output_size"]
if "error" in job:
status_info["error"] = job["error"]
if "logs" in job:
status_info["logs"] = job["logs"]
return status_info
async def cancel_job(self, job_id: str) -> bool:
"""Отмена задачи конвертации"""
if job_id not in self.active_jobs:
return False
job = self.active_jobs[job_id]
container = job["container"]
try:
# Остановка контейнера
container.stop(timeout=10)
job["status"] = "cancelled"
job["end_time"] = datetime.utcnow()
logger.info(f"Conversion job {job_id} cancelled")
return True
except Exception as e:
logger.error(f"Failed to cancel job {job_id}: {e}")
return False
async def get_active_jobs(self) -> List[dict]:
"""Получение списка активных задач"""
active_jobs = []
for job_id, job in self.active_jobs.items():
if job["status"] == "running":
active_jobs.append({
"job_id": job_id,
"conversion_type": job["conversion_type"],
"start_time": job["start_time"].isoformat(),
"duration_seconds": (datetime.utcnow() - job["start_time"]).total_seconds()
})
return active_jobs
🔧 Docker Compose конфигурация
Обновленный docker-compose.yml
version: '3.8'
services:
app:
build:
context: .
dockerfile: Dockerfile
ports:
- "15100:15100"
volumes:
# Доступ к Docker socket для управления converter контейнерами
- ${DOCKER_SOCK_PATH:-/var/run/docker.sock}:/var/run/docker.sock
# Общая папка для обмена файлами с converter контейнерами
- ./shared:/opt/my-network/shared
# Хранилище контента
- ./storage:/opt/my-network/storage
environment:
- DOCKER_SOCK_PATH=/var/run/docker.sock
- CONVERTER_ENABLED=true
- CONVERTER_MAX_PARALLEL=3
depends_on:
- postgres
- redis
networks:
- my-network
# Converter образ (только для сборки, не запускается)
converter-module:
build:
context: ./converter-module
dockerfile: Dockerfile
image: my-converter:latest
profiles:
- build-only # Не запускается в обычном режиме
postgres:
image: postgres:15
environment:
POSTGRES_DB: ${POSTGRES_DB}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- my-network
redis:
image: redis:7-alpine
networks:
- my-network
volumes:
postgres_data:
networks:
my-network:
driver: bridge
📊 API для управления конвертацией
REST API endpoints
@app.route('/api/v1/converter/convert', methods=['POST'])
async def start_conversion(request):
"""Запуск задачи конвертации"""
data = request.json
required_fields = ['input_file', 'output_file', 'conversion_type']
for field in required_fields:
if field not in data:
return response.json({'error': f'Missing required field: {field}'}, status=400)
try:
job_id = await converter_manager.convert_file(
input_file=data['input_file'],
output_file=data['output_file'],
conversion_type=data['conversion_type'],
options=data.get('options', {})
)
return response.json({
'job_id': job_id,
'status': 'started',
'message': 'Conversion job started successfully'
})
except ConverterError as e:
return response.json({'error': str(e)}, status=400)
@app.route('/api/v1/converter/status/<job_id>', methods=['GET'])
async def get_conversion_status(request, job_id):
"""Получение статуса задачи конвертации"""
status = await converter_manager.get_job_status(job_id)
return response.json(status)
@app.route('/api/v1/converter/cancel/<job_id>', methods=['POST'])
async def cancel_conversion(request, job_id):
"""Отмена задачи конвертации"""
cancelled = await converter_manager.cancel_job(job_id)
if cancelled:
return response.json({'message': 'Job cancelled successfully'})
else:
return response.json({'error': 'Failed to cancel job or job not found'}, status=404)
@app.route('/api/v1/converter/active', methods=['GET'])
async def get_active_conversions(request):
"""Список активных задач конвертации"""
active_jobs = await converter_manager.get_active_jobs()
return response.json({
'active_jobs': active_jobs,
'total_active': len(active_jobs)
})
@app.route('/api/v1/converter/stats', methods=['GET'])
async def get_converter_stats(request):
"""Статистика работы конвертера"""
return response.json({
'max_parallel_jobs': converter_manager.max_parallel_jobs,
'active_jobs_count': len(converter_manager.active_jobs),
'docker_sock_path': converter_manager.docker_sock_path,
'converter_image': converter_manager.converter_image,
'supported_conversions': [
'video_to_mp4',
'audio_to_mp3',
'image_resize',
'extract_thumbnail'
]
})
🛠️ Converter Module Dockerfile
converter-module/Dockerfile
FROM ubuntu:22.04
# Установка зависимостей
RUN apt-get update && apt-get install -y \
ffmpeg \
imagemagick \
python3 \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
# Установка Python зависимостей
RUN pip3 install \
pillow \
opencv-python
# Создание рабочей директории
WORKDIR /workspace
# Копирование скриптов конвертации
COPY scripts/ /usr/local/bin/
RUN chmod +x /usr/local/bin/*
# Создание директорий для файлов
RUN mkdir -p /input /output
# Точка входа
ENTRYPOINT ["/usr/local/bin/convert.sh"]
converter-module/scripts/convert.sh
#!/bin/bash
set -e
echo "Starting conversion job: $JOB_ID"
echo "Conversion type: $CONVERSION_TYPE"
echo "Input file: $INPUT_FILE"
echo "Output file: $OUTPUT_FILE"
# Функция логирования
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}
# Проверка входного файла
if [ ! -f "/workspace$INPUT_FILE" ]; then
log "ERROR: Input file not found: $INPUT_FILE"
exit 1
fi
# Создание выходной директории
mkdir -p "$(dirname "/workspace$OUTPUT_FILE")"
# Выполнение конвертации в зависимости от типа
case "$CONVERSION_TYPE" in
"video_to_mp4")
log "Converting video to MP4..."
ffmpeg -i "/workspace$INPUT_FILE" \
-c:v libx264 -c:a aac \
-preset medium -crf 23 \
"/workspace$OUTPUT_FILE"
;;
"audio_to_mp3")
log "Converting audio to MP3..."
ffmpeg -i "/workspace$INPUT_FILE" \
-c:a libmp3lame -b:a 192k \
"/workspace$OUTPUT_FILE"
;;
"image_resize")
log "Resizing image..."
convert "/workspace$INPUT_FILE" \
-resize 800x600 \
-quality 85 \
"/workspace$OUTPUT_FILE"
;;
"extract_thumbnail")
log "Extracting thumbnail..."
ffmpeg -i "/workspace$INPUT_FILE" \
-ss 00:00:01 -frames:v 1 \
-q:v 2 \
"/workspace$OUTPUT_FILE"
;;
*)
log "ERROR: Unsupported conversion type: $CONVERSION_TYPE"
exit 1
;;
esac
# Проверка результата
if [ -f "/workspace$OUTPUT_FILE" ]; then
log "Conversion completed successfully"
log "Output file size: $(du -h "/workspace$OUTPUT_FILE" | cut -f1)"
exit 0
else
log "ERROR: Output file was not created"
exit 1
fi
🔍 Мониторинг и логирование
Логирование converter операций
class ConverterLogger:
"""Специализированная система логирования для converter операций"""
def __init__(self):
self.logger = logging.getLogger('converter')
self.converter_metrics = {
'total_jobs': 0,
'successful_jobs': 0,
'failed_jobs': 0,
'total_processing_time': 0
}
def log_job_start(self, job_id: str, conversion_type: str, input_file: str):
"""Логирование начала задачи"""
self.logger.info({
'event': 'job_started',
'job_id': job_id,
'conversion_type': conversion_type,
'input_file': os.path.basename(input_file),
'timestamp': datetime.utcnow().isoformat()
})
self.converter_metrics['total_jobs'] += 1
def log_job_completion(self, job_id: str, success: bool, duration: float, output_size: int = None):
"""Логирование завершения задачи"""
if success:
self.converter_metrics['successful_jobs'] += 1
event = 'job_completed'
else:
self.converter_metrics['failed_jobs'] += 1
event = 'job_failed'
self.converter_metrics['total_processing_time'] += duration
log_data = {
'event': event,
'job_id': job_id,
'duration_seconds': duration,
'timestamp': datetime.utcnow().isoformat()
}
if output_size:
log_data['output_size_bytes'] = output_size
self.logger.info(log_data)
def get_metrics(self) -> dict:
"""Получение метрик converter модуля"""
total_jobs = self.converter_metrics['total_jobs']
return {
'total_jobs': total_jobs,
'successful_jobs': self.converter_metrics['successful_jobs'],
'failed_jobs': self.converter_metrics['failed_jobs'],
'success_rate': self.converter_metrics['successful_jobs'] / total_jobs if total_jobs > 0 else 0,
'average_processing_time': self.converter_metrics['total_processing_time'] / total_jobs if total_jobs > 0 else 0
}
🚀 Интеграция с MY Network
Автоматическая конвертация при загрузке контента
class ContentProcessingPipeline:
"""Конвейер обработки контента с автоматической конвертацией"""
async def process_uploaded_content(self, content: StoredContent):
"""Обработка загруженного контента"""
# Определение типа контента и необходимых конвертаций
conversions_needed = self._determine_conversions(content)
for conversion in conversions_needed:
try:
job_id = await converter_manager.convert_file(
input_file=content.file_path,
output_file=conversion['output_path'],
conversion_type=conversion['type'],
options=conversion.get('options', {})
)
# Сохранение связи между контентом и задачей конвертации
await self._link_content_conversion(content.id, job_id, conversion['type'])
except ConverterError as e:
logger.error(f"Failed to start conversion for content {content.id}: {e}")
def _determine_conversions(self, content: StoredContent) -> List[dict]:
"""Определение необходимых конвертаций"""
conversions = []
if content.content_type.startswith('video/'):
# Для видео создаем preview и thumbnail
conversions.extend([
{
'type': 'video_to_mp4',
'output_path': content.file_path.replace('.', '_preview.') + 'mp4',
'options': {'quality': 'medium', 'resolution': '720p'}
},
{
'type': 'extract_thumbnail',
'output_path': content.file_path.replace('.', '_thumb.') + 'jpg'
}
])
elif content.content_type.startswith('audio/'):
# Для аудио создаем preview версию
conversions.append({
'type': 'audio_to_mp3',
'output_path': content.file_path.replace('.', '_preview.') + 'mp3',
'options': {'bitrate': '128k', 'duration': '30'} # 30 секунд preview
})
elif content.content_type.startswith('image/'):
# Для изображений создаем thumbnail
conversions.append({
'type': 'image_resize',
'output_path': content.file_path.replace('.', '_thumb.') + 'jpg',
'options': {'size': '300x300', 'quality': 80}
})
return conversions
Эта система обеспечивает гибкую и эффективную обработку медиа файлов с полной интеграцией в MY Network архитектуру.