603 lines
23 KiB
Python
603 lines
23 KiB
Python
"""Media conversion service for processing uploaded files."""
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Set, Any, Tuple
|
|
|
|
import aiofiles
|
|
import redis.asyncio as redis
|
|
from PIL import Image, ImageOps
|
|
from sqlalchemy import select, update
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.config import get_settings
|
|
from app.core.database import get_async_session
|
|
from app.core.models.content import Content, FileUpload
|
|
from app.core.storage import storage_manager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ConvertService:
|
|
"""Service for converting and processing uploaded media files."""
|
|
|
|
def __init__(self):
|
|
self.settings = get_settings()
|
|
self.redis_client: Optional[redis.Redis] = None
|
|
self.is_running = False
|
|
self.tasks: Set[asyncio.Task] = set()
|
|
|
|
# Conversion configuration
|
|
self.batch_size = 10
|
|
self.process_interval = 5 # seconds
|
|
self.max_retries = 3
|
|
self.temp_dir = Path(tempfile.gettempdir()) / "uploader_convert"
|
|
self.temp_dir.mkdir(exist_ok=True)
|
|
|
|
# Supported formats
|
|
self.image_formats = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.tiff'}
|
|
self.video_formats = {'.mp4', '.avi', '.mov', '.wmv', '.flv', '.webm', '.mkv'}
|
|
self.audio_formats = {'.mp3', '.wav', '.ogg', '.m4a', '.flac', '.aac'}
|
|
self.document_formats = {'.pdf', '.doc', '.docx', '.txt', '.rtf'}
|
|
|
|
# Image processing settings
|
|
self.thumbnail_sizes = [(150, 150), (300, 300), (800, 600)]
|
|
self.image_quality = 85
|
|
self.max_image_size = (2048, 2048)
|
|
|
|
async def start(self) -> None:
|
|
"""Start the conversion service."""
|
|
try:
|
|
logger.info("Starting media conversion service")
|
|
|
|
# Initialize Redis connection
|
|
self.redis_client = redis.from_url(
|
|
self.settings.redis_url,
|
|
encoding="utf-8",
|
|
decode_responses=True,
|
|
socket_keepalive=True,
|
|
socket_keepalive_options={},
|
|
health_check_interval=30,
|
|
)
|
|
|
|
# Test Redis connection
|
|
await self.redis_client.ping()
|
|
logger.info("Redis connection established for converter")
|
|
|
|
# Start conversion tasks
|
|
self.is_running = True
|
|
|
|
# Create conversion tasks
|
|
tasks = [
|
|
asyncio.create_task(self._process_pending_files_loop()),
|
|
asyncio.create_task(self._cleanup_temp_files_loop()),
|
|
asyncio.create_task(self._retry_failed_conversions_loop()),
|
|
]
|
|
|
|
self.tasks.update(tasks)
|
|
|
|
# Wait for all tasks
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error starting conversion service: {e}")
|
|
await self.stop()
|
|
raise
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the conversion service."""
|
|
logger.info("Stopping media conversion service")
|
|
self.is_running = False
|
|
|
|
# Cancel all tasks
|
|
for task in self.tasks:
|
|
if not task.done():
|
|
task.cancel()
|
|
|
|
# Wait for tasks to complete
|
|
if self.tasks:
|
|
await asyncio.gather(*self.tasks, return_exceptions=True)
|
|
|
|
# Close Redis connection
|
|
if self.redis_client:
|
|
await self.redis_client.close()
|
|
|
|
# Cleanup temp directory
|
|
await self._cleanup_temp_directory()
|
|
|
|
logger.info("Conversion service stopped")
|
|
|
|
async def _process_pending_files_loop(self) -> None:
|
|
"""Main loop for processing pending file conversions."""
|
|
logger.info("Starting file conversion loop")
|
|
|
|
while self.is_running:
|
|
try:
|
|
await self._process_pending_files()
|
|
await asyncio.sleep(self.process_interval)
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in file conversion loop: {e}")
|
|
await asyncio.sleep(self.process_interval)
|
|
|
|
async def _process_pending_files(self) -> None:
|
|
"""Process pending file conversions."""
|
|
async with get_async_session() as session:
|
|
try:
|
|
# Get pending uploads
|
|
result = await session.execute(
|
|
select(FileUpload)
|
|
.where(
|
|
FileUpload.status == "uploaded",
|
|
FileUpload.processed == False
|
|
)
|
|
.limit(self.batch_size)
|
|
)
|
|
uploads = result.scalars().all()
|
|
|
|
if not uploads:
|
|
return
|
|
|
|
logger.info(f"Processing {len(uploads)} pending files")
|
|
|
|
# Process each upload
|
|
for upload in uploads:
|
|
await self._process_single_file(session, upload)
|
|
|
|
await session.commit()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing pending files: {e}")
|
|
await session.rollback()
|
|
|
|
async def _process_single_file(self, session: AsyncSession, upload: FileUpload) -> None:
|
|
"""Process a single file upload."""
|
|
try:
|
|
logger.info(f"Processing file: {upload.filename}")
|
|
|
|
# Mark as processing
|
|
upload.status = "processing"
|
|
upload.processing_started_at = datetime.utcnow()
|
|
await session.commit()
|
|
|
|
# Get file extension
|
|
file_ext = Path(upload.filename).suffix.lower()
|
|
|
|
# Process based on file type
|
|
if file_ext in self.image_formats:
|
|
await self._process_image(session, upload)
|
|
elif file_ext in self.video_formats:
|
|
await self._process_video(session, upload)
|
|
elif file_ext in self.audio_formats:
|
|
await self._process_audio(session, upload)
|
|
elif file_ext in self.document_formats:
|
|
await self._process_document(session, upload)
|
|
else:
|
|
# Just mark as processed for unsupported formats
|
|
upload.status = "completed"
|
|
upload.processed = True
|
|
upload.processing_completed_at = datetime.utcnow()
|
|
|
|
# Cache processing result
|
|
cache_key = f"processed:{upload.id}"
|
|
processing_info = {
|
|
"status": upload.status,
|
|
"processed_at": datetime.utcnow().isoformat(),
|
|
"metadata": upload.metadata or {}
|
|
}
|
|
await self.redis_client.setex(cache_key, 3600, json.dumps(processing_info))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing file {upload.filename}: {e}")
|
|
|
|
# Mark as failed
|
|
upload.status = "failed"
|
|
upload.error_message = str(e)
|
|
upload.retry_count = (upload.retry_count or 0) + 1
|
|
|
|
if upload.retry_count >= self.max_retries:
|
|
upload.processed = True # Stop retrying
|
|
|
|
async def _process_image(self, session: AsyncSession, upload: FileUpload) -> None:
|
|
"""Process an image file."""
|
|
try:
|
|
# Download original file
|
|
original_path = await self._download_file(upload)
|
|
|
|
if not original_path:
|
|
raise Exception("Failed to download original file")
|
|
|
|
# Open image
|
|
with Image.open(original_path) as img:
|
|
# Extract metadata
|
|
metadata = {
|
|
"format": img.format,
|
|
"mode": img.mode,
|
|
"size": img.size,
|
|
"has_transparency": img.mode in ('RGBA', 'LA') or 'transparency' in img.info
|
|
}
|
|
|
|
# Fix orientation
|
|
img = ImageOps.exif_transpose(img)
|
|
|
|
# Resize if too large
|
|
if img.size[0] > self.max_image_size[0] or img.size[1] > self.max_image_size[1]:
|
|
img.thumbnail(self.max_image_size, Image.Resampling.LANCZOS)
|
|
metadata["resized"] = True
|
|
|
|
# Save optimized version
|
|
optimized_path = self.temp_dir / f"optimized_{upload.id}.jpg"
|
|
|
|
# Convert to RGB if necessary
|
|
if img.mode in ('RGBA', 'LA'):
|
|
background = Image.new('RGB', img.size, (255, 255, 255))
|
|
if img.mode == 'LA':
|
|
img = img.convert('RGBA')
|
|
background.paste(img, mask=img.split()[-1])
|
|
img = background
|
|
elif img.mode != 'RGB':
|
|
img = img.convert('RGB')
|
|
|
|
img.save(
|
|
optimized_path,
|
|
'JPEG',
|
|
quality=self.image_quality,
|
|
optimize=True
|
|
)
|
|
|
|
# Upload optimized version
|
|
optimized_url = await storage_manager.upload_file(
|
|
str(optimized_path),
|
|
f"optimized/{upload.id}/image.jpg"
|
|
)
|
|
|
|
# Generate thumbnails
|
|
thumbnails = {}
|
|
for size in self.thumbnail_sizes:
|
|
thumbnail_path = await self._create_thumbnail(original_path, size)
|
|
if thumbnail_path:
|
|
thumb_url = await storage_manager.upload_file(
|
|
str(thumbnail_path),
|
|
f"thumbnails/{upload.id}/{size[0]}x{size[1]}.jpg"
|
|
)
|
|
thumbnails[f"{size[0]}x{size[1]}"] = thumb_url
|
|
thumbnail_path.unlink() # Cleanup
|
|
|
|
# Update upload record
|
|
upload.metadata = {
|
|
**metadata,
|
|
"thumbnails": thumbnails,
|
|
"optimized_url": optimized_url
|
|
}
|
|
upload.status = "completed"
|
|
upload.processed = True
|
|
upload.processing_completed_at = datetime.utcnow()
|
|
|
|
# Cleanup temp files
|
|
original_path.unlink()
|
|
optimized_path.unlink()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing image {upload.filename}: {e}")
|
|
raise
|
|
|
|
async def _process_video(self, session: AsyncSession, upload: FileUpload) -> None:
|
|
"""Process a video file."""
|
|
try:
|
|
# For video processing, we would typically use ffmpeg
|
|
# This is a simplified version that just extracts basic info
|
|
|
|
original_path = await self._download_file(upload)
|
|
if not original_path:
|
|
raise Exception("Failed to download original file")
|
|
|
|
# Basic video metadata (would use ffprobe in real implementation)
|
|
metadata = {
|
|
"type": "video",
|
|
"file_size": original_path.stat().st_size,
|
|
"processing_note": "Video processing requires ffmpeg implementation"
|
|
}
|
|
|
|
# Generate video thumbnail (simplified)
|
|
thumbnail_path = await self._create_video_thumbnail(original_path)
|
|
if thumbnail_path:
|
|
thumb_url = await storage_manager.upload_file(
|
|
str(thumbnail_path),
|
|
f"thumbnails/{upload.id}/video_thumb.jpg"
|
|
)
|
|
metadata["thumbnail"] = thumb_url
|
|
thumbnail_path.unlink()
|
|
|
|
# Update upload record
|
|
upload.metadata = metadata
|
|
upload.status = "completed"
|
|
upload.processed = True
|
|
upload.processing_completed_at = datetime.utcnow()
|
|
|
|
# Cleanup
|
|
original_path.unlink()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing video {upload.filename}: {e}")
|
|
raise
|
|
|
|
async def _process_audio(self, session: AsyncSession, upload: FileUpload) -> None:
|
|
"""Process an audio file."""
|
|
try:
|
|
original_path = await self._download_file(upload)
|
|
if not original_path:
|
|
raise Exception("Failed to download original file")
|
|
|
|
# Basic audio metadata
|
|
metadata = {
|
|
"type": "audio",
|
|
"file_size": original_path.stat().st_size,
|
|
"processing_note": "Audio processing requires additional libraries"
|
|
}
|
|
|
|
# Update upload record
|
|
upload.metadata = metadata
|
|
upload.status = "completed"
|
|
upload.processed = True
|
|
upload.processing_completed_at = datetime.utcnow()
|
|
|
|
# Cleanup
|
|
original_path.unlink()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing audio {upload.filename}: {e}")
|
|
raise
|
|
|
|
async def _process_document(self, session: AsyncSession, upload: FileUpload) -> None:
|
|
"""Process a document file."""
|
|
try:
|
|
original_path = await self._download_file(upload)
|
|
if not original_path:
|
|
raise Exception("Failed to download original file")
|
|
|
|
# Basic document metadata
|
|
metadata = {
|
|
"type": "document",
|
|
"file_size": original_path.stat().st_size,
|
|
"pages": 1, # Would extract actual page count for PDFs
|
|
"processing_note": "Document processing requires additional libraries"
|
|
}
|
|
|
|
# Update upload record
|
|
upload.metadata = metadata
|
|
upload.status = "completed"
|
|
upload.processed = True
|
|
upload.processing_completed_at = datetime.utcnow()
|
|
|
|
# Cleanup
|
|
original_path.unlink()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing document {upload.filename}: {e}")
|
|
raise
|
|
|
|
async def _download_file(self, upload: FileUpload) -> Optional[Path]:
|
|
"""Download a file for processing."""
|
|
try:
|
|
if not upload.file_path:
|
|
return None
|
|
|
|
# Create temp file path
|
|
temp_path = self.temp_dir / f"original_{upload.id}_{upload.filename}"
|
|
|
|
# Download file from storage
|
|
file_data = await storage_manager.get_file(upload.file_path)
|
|
if not file_data:
|
|
return None
|
|
|
|
# Write to temp file
|
|
async with aiofiles.open(temp_path, 'wb') as f:
|
|
await f.write(file_data)
|
|
|
|
return temp_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error downloading file {upload.filename}: {e}")
|
|
return None
|
|
|
|
async def _create_thumbnail(self, image_path: Path, size: Tuple[int, int]) -> Optional[Path]:
|
|
"""Create a thumbnail from an image."""
|
|
try:
|
|
thumbnail_path = self.temp_dir / f"thumb_{size[0]}x{size[1]}_{image_path.name}"
|
|
|
|
with Image.open(image_path) as img:
|
|
# Fix orientation
|
|
img = ImageOps.exif_transpose(img)
|
|
|
|
# Create thumbnail
|
|
img.thumbnail(size, Image.Resampling.LANCZOS)
|
|
|
|
# Convert to RGB if necessary
|
|
if img.mode in ('RGBA', 'LA'):
|
|
background = Image.new('RGB', img.size, (255, 255, 255))
|
|
if img.mode == 'LA':
|
|
img = img.convert('RGBA')
|
|
background.paste(img, mask=img.split()[-1])
|
|
img = background
|
|
elif img.mode != 'RGB':
|
|
img = img.convert('RGB')
|
|
|
|
# Save thumbnail
|
|
img.save(
|
|
thumbnail_path,
|
|
'JPEG',
|
|
quality=self.image_quality,
|
|
optimize=True
|
|
)
|
|
|
|
return thumbnail_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating thumbnail: {e}")
|
|
return None
|
|
|
|
async def _create_video_thumbnail(self, video_path: Path) -> Optional[Path]:
|
|
"""Create a thumbnail from a video file."""
|
|
try:
|
|
# This would require ffmpeg to extract a frame from the video
|
|
# For now, return a placeholder
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating video thumbnail: {e}")
|
|
return None
|
|
|
|
async def _cleanup_temp_files_loop(self) -> None:
|
|
"""Loop for cleaning up temporary files."""
|
|
logger.info("Starting temp file cleanup loop")
|
|
|
|
while self.is_running:
|
|
try:
|
|
await self._cleanup_old_temp_files()
|
|
await asyncio.sleep(3600) # Run every hour
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in temp cleanup loop: {e}")
|
|
await asyncio.sleep(3600)
|
|
|
|
async def _cleanup_old_temp_files(self) -> None:
|
|
"""Clean up old temporary files."""
|
|
try:
|
|
current_time = datetime.now().timestamp()
|
|
|
|
for file_path in self.temp_dir.glob("*"):
|
|
if file_path.is_file():
|
|
# Remove files older than 1 hour
|
|
if current_time - file_path.stat().st_mtime > 3600:
|
|
file_path.unlink()
|
|
logger.debug(f"Removed old temp file: {file_path}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning up temp files: {e}")
|
|
|
|
async def _cleanup_temp_directory(self) -> None:
|
|
"""Clean up the entire temp directory."""
|
|
try:
|
|
for file_path in self.temp_dir.glob("*"):
|
|
if file_path.is_file():
|
|
file_path.unlink()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning up temp directory: {e}")
|
|
|
|
async def _retry_failed_conversions_loop(self) -> None:
|
|
"""Loop for retrying failed conversions."""
|
|
logger.info("Starting retry loop for failed conversions")
|
|
|
|
while self.is_running:
|
|
try:
|
|
await self._retry_failed_conversions()
|
|
await asyncio.sleep(1800) # Run every 30 minutes
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in retry loop: {e}")
|
|
await asyncio.sleep(1800)
|
|
|
|
async def _retry_failed_conversions(self) -> None:
|
|
"""Retry failed conversions that haven't exceeded max retries."""
|
|
async with get_async_session() as session:
|
|
try:
|
|
# Get failed uploads that can be retried
|
|
result = await session.execute(
|
|
select(FileUpload)
|
|
.where(
|
|
FileUpload.status == "failed",
|
|
FileUpload.processed == False,
|
|
(FileUpload.retry_count < self.max_retries) | (FileUpload.retry_count.is_(None))
|
|
)
|
|
.limit(5) # Smaller batch for retries
|
|
)
|
|
uploads = result.scalars().all()
|
|
|
|
for upload in uploads:
|
|
logger.info(f"Retrying failed conversion for: {upload.filename}")
|
|
|
|
# Reset status
|
|
upload.status = "uploaded"
|
|
upload.error_message = None
|
|
|
|
# Process the file
|
|
await self._process_single_file(session, upload)
|
|
|
|
await session.commit()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error retrying failed conversions: {e}")
|
|
await session.rollback()
|
|
|
|
async def queue_file_for_processing(self, upload_id: str) -> bool:
|
|
"""Queue a file for processing."""
|
|
try:
|
|
# Add to processing queue
|
|
queue_key = "conversion_queue"
|
|
await self.redis_client.lpush(queue_key, upload_id)
|
|
|
|
logger.info(f"Queued file {upload_id} for processing")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error queuing file for processing: {e}")
|
|
return False
|
|
|
|
async def get_processing_stats(self) -> Dict[str, Any]:
|
|
"""Get processing statistics."""
|
|
try:
|
|
async with get_async_session() as session:
|
|
# Get upload stats by status
|
|
status_result = await session.execute(
|
|
select(FileUpload.status, asyncio.func.count())
|
|
.group_by(FileUpload.status)
|
|
)
|
|
status_stats = dict(status_result.fetchall())
|
|
|
|
# Get processing stats
|
|
processed_result = await session.execute(
|
|
select(asyncio.func.count())
|
|
.select_from(FileUpload)
|
|
.where(FileUpload.processed == True)
|
|
)
|
|
processed_count = processed_result.scalar()
|
|
|
|
# Get failed stats
|
|
failed_result = await session.execute(
|
|
select(asyncio.func.count())
|
|
.select_from(FileUpload)
|
|
.where(FileUpload.status == "failed")
|
|
)
|
|
failed_count = failed_result.scalar()
|
|
|
|
return {
|
|
"status_stats": status_stats,
|
|
"processed_count": processed_count,
|
|
"failed_count": failed_count,
|
|
"is_running": self.is_running,
|
|
"active_tasks": len([t for t in self.tasks if not t.done()]),
|
|
"temp_files": len(list(self.temp_dir.glob("*"))),
|
|
"last_update": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting processing stats: {e}")
|
|
return {"error": str(e)}
|
|
|
|
|
|
# Global converter instance
|
|
convert_service = ConvertService()
|