uploader-bot/docs/CONVERTER_MODULE.md

771 lines
28 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# MY Network v3.0 - Converter Module и Docker Integration
## 🔄 Обзор Converter Module
Converter Module представляет собой on-demand систему конвертации медиа файлов, работающую через Docker контейнеры. Модуль запускается только при необходимости обработки файлов и автоматически завершает работу после выполнения задачи.
## 🐳 Архитектура Docker Integration
### Принцип работы
```mermaid
graph TB
API[MY Network API] --> CM[Converter Manager]
CM --> DS[Docker Socket]
DS --> DI[Docker Image: my-converter]
subgraph "On-Demand Containers"
C1[Converter Container 1]
C2[Converter Container 2]
C3[Converter Container N]
end
DI -.->|Creates| C1
DI -.->|Creates| C2
DI -.->|Creates| C3
C1 --> FS[Shared File System]
C2 --> FS
C3 --> FS
C1 -.->|Auto-Remove| X1[❌]
C2 -.->|Auto-Remove| X2[❌]
C3 -.->|Auto-Remove| X3[❌]
```
### Docker Socket Integration
```python
class DockerConverterManager:
"""Управление converter контейнерами через Docker API"""
def __init__(self, docker_sock_path: str = "/var/run/docker.sock"):
self.docker_sock_path = docker_sock_path
self.docker_client = None
self.converter_image = "my-converter:latest"
self.max_parallel_jobs = 3
self.active_jobs = {}
async def initialize(self):
"""Инициализация Docker клиента"""
try:
# Проверка доступности Docker socket
if not os.path.exists(self.docker_sock_path):
raise DockerError(f"Docker socket not found: {self.docker_sock_path}")
# Создание Docker клиента
self.docker_client = docker.DockerClient(
base_url=f"unix://{self.docker_sock_path}"
)
# Проверка связи с Docker
await self._test_docker_connection()
# Проверка наличия converter образа
await self._ensure_converter_image()
logger.info("Docker converter manager initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Docker converter manager: {e}")
raise
async def _test_docker_connection(self):
"""Проверка подключения к Docker"""
try:
info = self.docker_client.info()
logger.info(f"Docker connected: {info['ServerVersion']}")
except Exception as e:
raise DockerError(f"Cannot connect to Docker: {e}")
async def _ensure_converter_image(self):
"""Проверка наличия converter образа"""
try:
image = self.docker_client.images.get(self.converter_image)
logger.info(f"Converter image found: {image.id[:12]}")
except docker.errors.ImageNotFound:
logger.error(f"Converter image not found: {self.converter_image}")
raise DockerError(f"Please build converter image: {self.converter_image}")
async def convert_file(self, input_file: str, output_file: str,
conversion_type: str, options: dict = None) -> str:
"""
Запуск конвертации файла в новом контейнере
Args:
input_file: Путь к входному файлу
output_file: Путь к выходному файлу
conversion_type: Тип конвертации (video_to_mp4, audio_to_mp3, etc.)
options: Дополнительные параметры конвертации
Returns:
str: ID задачи конвертации
"""
# Проверка лимита параллельных задач
if len(self.active_jobs) >= self.max_parallel_jobs:
raise ConverterError("Too many active conversion jobs")
# Генерация ID задачи
job_id = str(uuid4())
# Подготовка окружения для контейнера
container_config = await self._prepare_container_config(
job_id, input_file, output_file, conversion_type, options
)
try:
# Создание и запуск контейнера
container = self.docker_client.containers.run(
image=self.converter_image,
command=container_config["command"],
volumes=container_config["volumes"],
environment=container_config["environment"],
name=f"converter-{job_id}",
detach=True,
auto_remove=True, # Автоматическое удаление после завершения
cpu_count=1, # Ограничение ресурсов
mem_limit="1g"
)
# Регистрация активной задачи
self.active_jobs[job_id] = {
"container": container,
"start_time": datetime.utcnow(),
"input_file": input_file,
"output_file": output_file,
"conversion_type": conversion_type,
"status": "running"
}
logger.info(f"Conversion job {job_id} started in container {container.id[:12]}")
# Запуск мониторинга задачи в фоне
asyncio.create_task(self._monitor_conversion_job(job_id))
return job_id
except Exception as e:
logger.error(f"Failed to start conversion job {job_id}: {e}")
raise ConverterError(f"Conversion start failed: {e}")
async def _prepare_container_config(self, job_id: str, input_file: str,
output_file: str, conversion_type: str,
options: dict) -> dict:
"""Подготовка конфигурации контейнера"""
# Общая папка для файлов между хостом и контейнером
shared_volume = "/opt/my-network/shared"
container_input = f"/input/{os.path.basename(input_file)}"
container_output = f"/output/{os.path.basename(output_file)}"
# Копирование входного файла в shared папку
shared_input_path = f"{shared_volume}/input/{job_id}_{os.path.basename(input_file)}"
os.makedirs(os.path.dirname(shared_input_path), exist_ok=True)
shutil.copy2(input_file, shared_input_path)
# Команда для конвертации
command = self._build_conversion_command(
container_input, container_output, conversion_type, options
)
return {
"command": command,
"volumes": {
shared_volume: {
"bind": "/workspace",
"mode": "rw"
}
},
"environment": {
"JOB_ID": job_id,
"CONVERSION_TYPE": conversion_type,
"INPUT_FILE": container_input,
"OUTPUT_FILE": container_output
}
}
def _build_conversion_command(self, input_file: str, output_file: str,
conversion_type: str, options: dict) -> List[str]:
"""Построение команды конвертации"""
conversion_commands = {
"video_to_mp4": [
"ffmpeg", "-i", input_file,
"-c:v", "libx264", "-c:a", "aac",
"-preset", "medium", "-crf", "23",
output_file
],
"audio_to_mp3": [
"ffmpeg", "-i", input_file,
"-c:a", "libmp3lame", "-b:a", "192k",
output_file
],
"image_resize": [
"convert", input_file,
"-resize", options.get("size", "800x600"),
"-quality", str(options.get("quality", 85)),
output_file
],
"extract_thumbnail": [
"ffmpeg", "-i", input_file,
"-ss", "00:00:01", "-frames:v", "1",
"-q:v", "2", output_file
]
}
if conversion_type not in conversion_commands:
raise ConverterError(f"Unsupported conversion type: {conversion_type}")
return conversion_commands[conversion_type]
async def _monitor_conversion_job(self, job_id: str):
"""Мониторинг выполнения задачи конвертации"""
if job_id not in self.active_jobs:
return
job = self.active_jobs[job_id]
container = job["container"]
try:
# Ожидание завершения контейнера
result = container.wait(timeout=300) # 5 минут таймаут
# Получение логов
logs = container.logs().decode('utf-8')
# Обновление статуса задачи
if result["StatusCode"] == 0:
job["status"] = "completed"
job["end_time"] = datetime.utcnow()
# Перемещение выходного файла
await self._handle_conversion_success(job_id)
logger.info(f"Conversion job {job_id} completed successfully")
else:
job["status"] = "failed"
job["error"] = f"Container exited with code {result['StatusCode']}"
job["logs"] = logs
logger.error(f"Conversion job {job_id} failed: {job['error']}")
except docker.errors.ContainerError as e:
job["status"] = "failed"
job["error"] = str(e)
logger.error(f"Conversion job {job_id} container error: {e}")
except Exception as e:
job["status"] = "failed"
job["error"] = str(e)
logger.error(f"Conversion job {job_id} monitoring error: {e}")
finally:
# Очистка ресурсов
await self._cleanup_conversion_job(job_id)
async def _handle_conversion_success(self, job_id: str):
"""Обработка успешного завершения конвертации"""
job = self.active_jobs[job_id]
shared_volume = "/opt/my-network/shared"
# Путь к выходному файлу в shared папке
shared_output_path = f"{shared_volume}/output/{job_id}_{os.path.basename(job['output_file'])}"
if os.path.exists(shared_output_path):
# Перемещение файла в целевое расположение
os.makedirs(os.path.dirname(job["output_file"]), exist_ok=True)
shutil.move(shared_output_path, job["output_file"])
job["output_size"] = os.path.getsize(job["output_file"])
logger.info(f"Output file moved to: {job['output_file']}")
else:
job["status"] = "failed"
job["error"] = "Output file not found after conversion"
async def _cleanup_conversion_job(self, job_id: str):
"""Очистка ресурсов после завершения задачи"""
if job_id not in self.active_jobs:
return
job = self.active_jobs[job_id]
shared_volume = "/opt/my-network/shared"
# Удаление временных файлов
input_cleanup = f"{shared_volume}/input/{job_id}_*"
output_cleanup = f"{shared_volume}/output/{job_id}_*"
for pattern in [input_cleanup, output_cleanup]:
for file_path in glob.glob(pattern):
try:
os.remove(file_path)
except OSError:
pass
# Удаление задачи из активных (через некоторое время для логов)
await asyncio.sleep(60) # Сохраняем информацию 1 минуту
if job_id in self.active_jobs:
del self.active_jobs[job_id]
async def get_job_status(self, job_id: str) -> dict:
"""Получение статуса задачи конвертации"""
if job_id not in self.active_jobs:
return {"status": "not_found", "error": "Job not found"}
job = self.active_jobs[job_id]
status_info = {
"job_id": job_id,
"status": job["status"],
"conversion_type": job["conversion_type"],
"start_time": job["start_time"].isoformat(),
"input_file": job["input_file"],
"output_file": job["output_file"]
}
if "end_time" in job:
status_info["end_time"] = job["end_time"].isoformat()
duration = (job["end_time"] - job["start_time"]).total_seconds()
status_info["duration_seconds"] = duration
if "output_size" in job:
status_info["output_size"] = job["output_size"]
if "error" in job:
status_info["error"] = job["error"]
if "logs" in job:
status_info["logs"] = job["logs"]
return status_info
async def cancel_job(self, job_id: str) -> bool:
"""Отмена задачи конвертации"""
if job_id not in self.active_jobs:
return False
job = self.active_jobs[job_id]
container = job["container"]
try:
# Остановка контейнера
container.stop(timeout=10)
job["status"] = "cancelled"
job["end_time"] = datetime.utcnow()
logger.info(f"Conversion job {job_id} cancelled")
return True
except Exception as e:
logger.error(f"Failed to cancel job {job_id}: {e}")
return False
async def get_active_jobs(self) -> List[dict]:
"""Получение списка активных задач"""
active_jobs = []
for job_id, job in self.active_jobs.items():
if job["status"] == "running":
active_jobs.append({
"job_id": job_id,
"conversion_type": job["conversion_type"],
"start_time": job["start_time"].isoformat(),
"duration_seconds": (datetime.utcnow() - job["start_time"]).total_seconds()
})
return active_jobs
```
## 🔧 Docker Compose конфигурация
### Обновленный docker-compose.yml
```yaml
version: '3.8'
services:
app:
build:
context: .
dockerfile: Dockerfile
ports:
- "15100:15100"
volumes:
# Доступ к Docker socket для управления converter контейнерами
- ${DOCKER_SOCK_PATH:-/var/run/docker.sock}:/var/run/docker.sock
# Общая папка для обмена файлами с converter контейнерами
- ./shared:/opt/my-network/shared
# Хранилище контента
- ./storage:/opt/my-network/storage
environment:
- DOCKER_SOCK_PATH=/var/run/docker.sock
- CONVERTER_ENABLED=true
- CONVERTER_MAX_PARALLEL=3
depends_on:
- postgres
- redis
networks:
- my-network
# Converter образ (только для сборки, не запускается)
converter-module:
build:
context: ./converter-module
dockerfile: Dockerfile
image: my-converter:latest
profiles:
- build-only # Не запускается в обычном режиме
postgres:
image: postgres:15
environment:
POSTGRES_DB: ${POSTGRES_DB}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- my-network
redis:
image: redis:7-alpine
networks:
- my-network
volumes:
postgres_data:
networks:
my-network:
driver: bridge
```
## 📊 API для управления конвертацией
### REST API endpoints
```python
@app.route('/api/v1/converter/convert', methods=['POST'])
async def start_conversion(request):
"""Запуск задачи конвертации"""
data = request.json
required_fields = ['input_file', 'output_file', 'conversion_type']
for field in required_fields:
if field not in data:
return response.json({'error': f'Missing required field: {field}'}, status=400)
try:
job_id = await converter_manager.convert_file(
input_file=data['input_file'],
output_file=data['output_file'],
conversion_type=data['conversion_type'],
options=data.get('options', {})
)
return response.json({
'job_id': job_id,
'status': 'started',
'message': 'Conversion job started successfully'
})
except ConverterError as e:
return response.json({'error': str(e)}, status=400)
@app.route('/api/v1/converter/status/<job_id>', methods=['GET'])
async def get_conversion_status(request, job_id):
"""Получение статуса задачи конвертации"""
status = await converter_manager.get_job_status(job_id)
return response.json(status)
@app.route('/api/v1/converter/cancel/<job_id>', methods=['POST'])
async def cancel_conversion(request, job_id):
"""Отмена задачи конвертации"""
cancelled = await converter_manager.cancel_job(job_id)
if cancelled:
return response.json({'message': 'Job cancelled successfully'})
else:
return response.json({'error': 'Failed to cancel job or job not found'}, status=404)
@app.route('/api/v1/converter/active', methods=['GET'])
async def get_active_conversions(request):
"""Список активных задач конвертации"""
active_jobs = await converter_manager.get_active_jobs()
return response.json({
'active_jobs': active_jobs,
'total_active': len(active_jobs)
})
@app.route('/api/v1/converter/stats', methods=['GET'])
async def get_converter_stats(request):
"""Статистика работы конвертера"""
return response.json({
'max_parallel_jobs': converter_manager.max_parallel_jobs,
'active_jobs_count': len(converter_manager.active_jobs),
'docker_sock_path': converter_manager.docker_sock_path,
'converter_image': converter_manager.converter_image,
'supported_conversions': [
'video_to_mp4',
'audio_to_mp3',
'image_resize',
'extract_thumbnail'
]
})
```
## 🛠️ Converter Module Dockerfile
### converter-module/Dockerfile
```dockerfile
FROM ubuntu:22.04
# Установка зависимостей
RUN apt-get update && apt-get install -y \
ffmpeg \
imagemagick \
python3 \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
# Установка Python зависимостей
RUN pip3 install \
pillow \
opencv-python
# Создание рабочей директории
WORKDIR /workspace
# Копирование скриптов конвертации
COPY scripts/ /usr/local/bin/
RUN chmod +x /usr/local/bin/*
# Создание директорий для файлов
RUN mkdir -p /input /output
# Точка входа
ENTRYPOINT ["/usr/local/bin/convert.sh"]
```
### converter-module/scripts/convert.sh
```bash
#!/bin/bash
set -e
echo "Starting conversion job: $JOB_ID"
echo "Conversion type: $CONVERSION_TYPE"
echo "Input file: $INPUT_FILE"
echo "Output file: $OUTPUT_FILE"
# Функция логирования
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}
# Проверка входного файла
if [ ! -f "/workspace$INPUT_FILE" ]; then
log "ERROR: Input file not found: $INPUT_FILE"
exit 1
fi
# Создание выходной директории
mkdir -p "$(dirname "/workspace$OUTPUT_FILE")"
# Выполнение конвертации в зависимости от типа
case "$CONVERSION_TYPE" in
"video_to_mp4")
log "Converting video to MP4..."
ffmpeg -i "/workspace$INPUT_FILE" \
-c:v libx264 -c:a aac \
-preset medium -crf 23 \
"/workspace$OUTPUT_FILE"
;;
"audio_to_mp3")
log "Converting audio to MP3..."
ffmpeg -i "/workspace$INPUT_FILE" \
-c:a libmp3lame -b:a 192k \
"/workspace$OUTPUT_FILE"
;;
"image_resize")
log "Resizing image..."
convert "/workspace$INPUT_FILE" \
-resize 800x600 \
-quality 85 \
"/workspace$OUTPUT_FILE"
;;
"extract_thumbnail")
log "Extracting thumbnail..."
ffmpeg -i "/workspace$INPUT_FILE" \
-ss 00:00:01 -frames:v 1 \
-q:v 2 \
"/workspace$OUTPUT_FILE"
;;
*)
log "ERROR: Unsupported conversion type: $CONVERSION_TYPE"
exit 1
;;
esac
# Проверка результата
if [ -f "/workspace$OUTPUT_FILE" ]; then
log "Conversion completed successfully"
log "Output file size: $(du -h "/workspace$OUTPUT_FILE" | cut -f1)"
exit 0
else
log "ERROR: Output file was not created"
exit 1
fi
```
## 🔍 Мониторинг и логирование
### Логирование converter операций
```python
class ConverterLogger:
"""Специализированная система логирования для converter операций"""
def __init__(self):
self.logger = logging.getLogger('converter')
self.converter_metrics = {
'total_jobs': 0,
'successful_jobs': 0,
'failed_jobs': 0,
'total_processing_time': 0
}
def log_job_start(self, job_id: str, conversion_type: str, input_file: str):
"""Логирование начала задачи"""
self.logger.info({
'event': 'job_started',
'job_id': job_id,
'conversion_type': conversion_type,
'input_file': os.path.basename(input_file),
'timestamp': datetime.utcnow().isoformat()
})
self.converter_metrics['total_jobs'] += 1
def log_job_completion(self, job_id: str, success: bool, duration: float, output_size: int = None):
"""Логирование завершения задачи"""
if success:
self.converter_metrics['successful_jobs'] += 1
event = 'job_completed'
else:
self.converter_metrics['failed_jobs'] += 1
event = 'job_failed'
self.converter_metrics['total_processing_time'] += duration
log_data = {
'event': event,
'job_id': job_id,
'duration_seconds': duration,
'timestamp': datetime.utcnow().isoformat()
}
if output_size:
log_data['output_size_bytes'] = output_size
self.logger.info(log_data)
def get_metrics(self) -> dict:
"""Получение метрик converter модуля"""
total_jobs = self.converter_metrics['total_jobs']
return {
'total_jobs': total_jobs,
'successful_jobs': self.converter_metrics['successful_jobs'],
'failed_jobs': self.converter_metrics['failed_jobs'],
'success_rate': self.converter_metrics['successful_jobs'] / total_jobs if total_jobs > 0 else 0,
'average_processing_time': self.converter_metrics['total_processing_time'] / total_jobs if total_jobs > 0 else 0
}
```
## 🚀 Интеграция с MY Network
### Автоматическая конвертация при загрузке контента
```python
class ContentProcessingPipeline:
"""Конвейер обработки контента с автоматической конвертацией"""
async def process_uploaded_content(self, content: StoredContent):
"""Обработка загруженного контента"""
# Определение типа контента и необходимых конвертаций
conversions_needed = self._determine_conversions(content)
for conversion in conversions_needed:
try:
job_id = await converter_manager.convert_file(
input_file=content.file_path,
output_file=conversion['output_path'],
conversion_type=conversion['type'],
options=conversion.get('options', {})
)
# Сохранение связи между контентом и задачей конвертации
await self._link_content_conversion(content.id, job_id, conversion['type'])
except ConverterError as e:
logger.error(f"Failed to start conversion for content {content.id}: {e}")
def _determine_conversions(self, content: StoredContent) -> List[dict]:
"""Определение необходимых конвертаций"""
conversions = []
if content.content_type.startswith('video/'):
# Для видео создаем preview и thumbnail
conversions.extend([
{
'type': 'video_to_mp4',
'output_path': content.file_path.replace('.', '_preview.') + 'mp4',
'options': {'quality': 'medium', 'resolution': '720p'}
},
{
'type': 'extract_thumbnail',
'output_path': content.file_path.replace('.', '_thumb.') + 'jpg'
}
])
elif content.content_type.startswith('audio/'):
# Для аудио создаем preview версию
conversions.append({
'type': 'audio_to_mp3',
'output_path': content.file_path.replace('.', '_preview.') + 'mp3',
'options': {'bitrate': '128k', 'duration': '30'} # 30 секунд preview
})
elif content.content_type.startswith('image/'):
# Для изображений создаем thumbnail
conversions.append({
'type': 'image_resize',
'output_path': content.file_path.replace('.', '_thumb.') + 'jpg',
'options': {'size': '300x300', 'quality': 80}
})
return conversions
```
Эта система обеспечивает гибкую и эффективную обработку медиа файлов с полной интеграцией в MY Network архитектуру.