"""Media conversion service for processing uploaded files.""" import asyncio import hashlib import json import logging import os import tempfile from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Set, Any, Tuple import aiofiles import redis.asyncio as redis from PIL import Image, ImageOps from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import get_settings from app.core.database import get_async_session from app.core.models.content import Content, FileUpload from app.core.storage import storage_manager logger = logging.getLogger(__name__) class ConvertService: """Service for converting and processing uploaded media files.""" def __init__(self): self.settings = get_settings() self.redis_client: Optional[redis.Redis] = None self.is_running = False self.tasks: Set[asyncio.Task] = set() # Conversion configuration self.batch_size = 10 self.process_interval = 5 # seconds self.max_retries = 3 self.temp_dir = Path(tempfile.gettempdir()) / "uploader_convert" self.temp_dir.mkdir(exist_ok=True) # Supported formats self.image_formats = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.tiff'} self.video_formats = {'.mp4', '.avi', '.mov', '.wmv', '.flv', '.webm', '.mkv'} self.audio_formats = {'.mp3', '.wav', '.ogg', '.m4a', '.flac', '.aac'} self.document_formats = {'.pdf', '.doc', '.docx', '.txt', '.rtf'} # Image processing settings self.thumbnail_sizes = [(150, 150), (300, 300), (800, 600)] self.image_quality = 85 self.max_image_size = (2048, 2048) async def start(self) -> None: """Start the conversion service.""" try: logger.info("Starting media conversion service") # Initialize Redis connection self.redis_client = redis.from_url( self.settings.redis_url, encoding="utf-8", decode_responses=True, socket_keepalive=True, socket_keepalive_options={}, health_check_interval=30, ) # Test Redis connection await self.redis_client.ping() logger.info("Redis connection established for converter") # Start conversion tasks self.is_running = True # Create conversion tasks tasks = [ asyncio.create_task(self._process_pending_files_loop()), asyncio.create_task(self._cleanup_temp_files_loop()), asyncio.create_task(self._retry_failed_conversions_loop()), ] self.tasks.update(tasks) # Wait for all tasks await asyncio.gather(*tasks, return_exceptions=True) except Exception as e: logger.error(f"Error starting conversion service: {e}") await self.stop() raise async def stop(self) -> None: """Stop the conversion service.""" logger.info("Stopping media conversion service") self.is_running = False # Cancel all tasks for task in self.tasks: if not task.done(): task.cancel() # Wait for tasks to complete if self.tasks: await asyncio.gather(*self.tasks, return_exceptions=True) # Close Redis connection if self.redis_client: await self.redis_client.close() # Cleanup temp directory await self._cleanup_temp_directory() logger.info("Conversion service stopped") async def _process_pending_files_loop(self) -> None: """Main loop for processing pending file conversions.""" logger.info("Starting file conversion loop") while self.is_running: try: await self._process_pending_files() await asyncio.sleep(self.process_interval) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in file conversion loop: {e}") await asyncio.sleep(self.process_interval) async def _process_pending_files(self) -> None: """Process pending file conversions.""" async with get_async_session() as session: try: # Get pending uploads result = await session.execute( select(FileUpload) .where( FileUpload.status == "uploaded", FileUpload.processed == False ) .limit(self.batch_size) ) uploads = result.scalars().all() if not uploads: return logger.info(f"Processing {len(uploads)} pending files") # Process each upload for upload in uploads: await self._process_single_file(session, upload) await session.commit() except Exception as e: logger.error(f"Error processing pending files: {e}") await session.rollback() async def _process_single_file(self, session: AsyncSession, upload: FileUpload) -> None: """Process a single file upload.""" try: logger.info(f"Processing file: {upload.filename}") # Mark as processing upload.status = "processing" upload.processing_started_at = datetime.utcnow() await session.commit() # Get file extension file_ext = Path(upload.filename).suffix.lower() # Process based on file type if file_ext in self.image_formats: await self._process_image(session, upload) elif file_ext in self.video_formats: await self._process_video(session, upload) elif file_ext in self.audio_formats: await self._process_audio(session, upload) elif file_ext in self.document_formats: await self._process_document(session, upload) else: # Just mark as processed for unsupported formats upload.status = "completed" upload.processed = True upload.processing_completed_at = datetime.utcnow() # Cache processing result cache_key = f"processed:{upload.id}" processing_info = { "status": upload.status, "processed_at": datetime.utcnow().isoformat(), "metadata": upload.metadata or {} } await self.redis_client.setex(cache_key, 3600, json.dumps(processing_info)) except Exception as e: logger.error(f"Error processing file {upload.filename}: {e}") # Mark as failed upload.status = "failed" upload.error_message = str(e) upload.retry_count = (upload.retry_count or 0) + 1 if upload.retry_count >= self.max_retries: upload.processed = True # Stop retrying async def _process_image(self, session: AsyncSession, upload: FileUpload) -> None: """Process an image file.""" try: # Download original file original_path = await self._download_file(upload) if not original_path: raise Exception("Failed to download original file") # Open image with Image.open(original_path) as img: # Extract metadata metadata = { "format": img.format, "mode": img.mode, "size": img.size, "has_transparency": img.mode in ('RGBA', 'LA') or 'transparency' in img.info } # Fix orientation img = ImageOps.exif_transpose(img) # Resize if too large if img.size[0] > self.max_image_size[0] or img.size[1] > self.max_image_size[1]: img.thumbnail(self.max_image_size, Image.Resampling.LANCZOS) metadata["resized"] = True # Save optimized version optimized_path = self.temp_dir / f"optimized_{upload.id}.jpg" # Convert to RGB if necessary if img.mode in ('RGBA', 'LA'): background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'LA': img = img.convert('RGBA') background.paste(img, mask=img.split()[-1]) img = background elif img.mode != 'RGB': img = img.convert('RGB') img.save( optimized_path, 'JPEG', quality=self.image_quality, optimize=True ) # Upload optimized version optimized_url = await storage_manager.upload_file( str(optimized_path), f"optimized/{upload.id}/image.jpg" ) # Generate thumbnails thumbnails = {} for size in self.thumbnail_sizes: thumbnail_path = await self._create_thumbnail(original_path, size) if thumbnail_path: thumb_url = await storage_manager.upload_file( str(thumbnail_path), f"thumbnails/{upload.id}/{size[0]}x{size[1]}.jpg" ) thumbnails[f"{size[0]}x{size[1]}"] = thumb_url thumbnail_path.unlink() # Cleanup # Update upload record upload.metadata = { **metadata, "thumbnails": thumbnails, "optimized_url": optimized_url } upload.status = "completed" upload.processed = True upload.processing_completed_at = datetime.utcnow() # Cleanup temp files original_path.unlink() optimized_path.unlink() except Exception as e: logger.error(f"Error processing image {upload.filename}: {e}") raise async def _process_video(self, session: AsyncSession, upload: FileUpload) -> None: """Process a video file.""" try: # For video processing, we would typically use ffmpeg # This is a simplified version that just extracts basic info original_path = await self._download_file(upload) if not original_path: raise Exception("Failed to download original file") # Basic video metadata (would use ffprobe in real implementation) metadata = { "type": "video", "file_size": original_path.stat().st_size, "processing_note": "Video processing requires ffmpeg implementation" } # Generate video thumbnail (simplified) thumbnail_path = await self._create_video_thumbnail(original_path) if thumbnail_path: thumb_url = await storage_manager.upload_file( str(thumbnail_path), f"thumbnails/{upload.id}/video_thumb.jpg" ) metadata["thumbnail"] = thumb_url thumbnail_path.unlink() # Update upload record upload.metadata = metadata upload.status = "completed" upload.processed = True upload.processing_completed_at = datetime.utcnow() # Cleanup original_path.unlink() except Exception as e: logger.error(f"Error processing video {upload.filename}: {e}") raise async def _process_audio(self, session: AsyncSession, upload: FileUpload) -> None: """Process an audio file.""" try: original_path = await self._download_file(upload) if not original_path: raise Exception("Failed to download original file") # Basic audio metadata metadata = { "type": "audio", "file_size": original_path.stat().st_size, "processing_note": "Audio processing requires additional libraries" } # Update upload record upload.metadata = metadata upload.status = "completed" upload.processed = True upload.processing_completed_at = datetime.utcnow() # Cleanup original_path.unlink() except Exception as e: logger.error(f"Error processing audio {upload.filename}: {e}") raise async def _process_document(self, session: AsyncSession, upload: FileUpload) -> None: """Process a document file.""" try: original_path = await self._download_file(upload) if not original_path: raise Exception("Failed to download original file") # Basic document metadata metadata = { "type": "document", "file_size": original_path.stat().st_size, "pages": 1, # Would extract actual page count for PDFs "processing_note": "Document processing requires additional libraries" } # Update upload record upload.metadata = metadata upload.status = "completed" upload.processed = True upload.processing_completed_at = datetime.utcnow() # Cleanup original_path.unlink() except Exception as e: logger.error(f"Error processing document {upload.filename}: {e}") raise async def _download_file(self, upload: FileUpload) -> Optional[Path]: """Download a file for processing.""" try: if not upload.file_path: return None # Create temp file path temp_path = self.temp_dir / f"original_{upload.id}_{upload.filename}" # Download file from storage file_data = await storage_manager.get_file(upload.file_path) if not file_data: return None # Write to temp file async with aiofiles.open(temp_path, 'wb') as f: await f.write(file_data) return temp_path except Exception as e: logger.error(f"Error downloading file {upload.filename}: {e}") return None async def _create_thumbnail(self, image_path: Path, size: Tuple[int, int]) -> Optional[Path]: """Create a thumbnail from an image.""" try: thumbnail_path = self.temp_dir / f"thumb_{size[0]}x{size[1]}_{image_path.name}" with Image.open(image_path) as img: # Fix orientation img = ImageOps.exif_transpose(img) # Create thumbnail img.thumbnail(size, Image.Resampling.LANCZOS) # Convert to RGB if necessary if img.mode in ('RGBA', 'LA'): background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'LA': img = img.convert('RGBA') background.paste(img, mask=img.split()[-1]) img = background elif img.mode != 'RGB': img = img.convert('RGB') # Save thumbnail img.save( thumbnail_path, 'JPEG', quality=self.image_quality, optimize=True ) return thumbnail_path except Exception as e: logger.error(f"Error creating thumbnail: {e}") return None async def _create_video_thumbnail(self, video_path: Path) -> Optional[Path]: """Create a thumbnail from a video file.""" try: # This would require ffmpeg to extract a frame from the video # For now, return a placeholder return None except Exception as e: logger.error(f"Error creating video thumbnail: {e}") return None async def _cleanup_temp_files_loop(self) -> None: """Loop for cleaning up temporary files.""" logger.info("Starting temp file cleanup loop") while self.is_running: try: await self._cleanup_old_temp_files() await asyncio.sleep(3600) # Run every hour except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in temp cleanup loop: {e}") await asyncio.sleep(3600) async def _cleanup_old_temp_files(self) -> None: """Clean up old temporary files.""" try: current_time = datetime.now().timestamp() for file_path in self.temp_dir.glob("*"): if file_path.is_file(): # Remove files older than 1 hour if current_time - file_path.stat().st_mtime > 3600: file_path.unlink() logger.debug(f"Removed old temp file: {file_path}") except Exception as e: logger.error(f"Error cleaning up temp files: {e}") async def _cleanup_temp_directory(self) -> None: """Clean up the entire temp directory.""" try: for file_path in self.temp_dir.glob("*"): if file_path.is_file(): file_path.unlink() except Exception as e: logger.error(f"Error cleaning up temp directory: {e}") async def _retry_failed_conversions_loop(self) -> None: """Loop for retrying failed conversions.""" logger.info("Starting retry loop for failed conversions") while self.is_running: try: await self._retry_failed_conversions() await asyncio.sleep(1800) # Run every 30 minutes except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in retry loop: {e}") await asyncio.sleep(1800) async def _retry_failed_conversions(self) -> None: """Retry failed conversions that haven't exceeded max retries.""" async with get_async_session() as session: try: # Get failed uploads that can be retried result = await session.execute( select(FileUpload) .where( FileUpload.status == "failed", FileUpload.processed == False, (FileUpload.retry_count < self.max_retries) | (FileUpload.retry_count.is_(None)) ) .limit(5) # Smaller batch for retries ) uploads = result.scalars().all() for upload in uploads: logger.info(f"Retrying failed conversion for: {upload.filename}") # Reset status upload.status = "uploaded" upload.error_message = None # Process the file await self._process_single_file(session, upload) await session.commit() except Exception as e: logger.error(f"Error retrying failed conversions: {e}") await session.rollback() async def queue_file_for_processing(self, upload_id: str) -> bool: """Queue a file for processing.""" try: # Add to processing queue queue_key = "conversion_queue" await self.redis_client.lpush(queue_key, upload_id) logger.info(f"Queued file {upload_id} for processing") return True except Exception as e: logger.error(f"Error queuing file for processing: {e}") return False async def get_processing_stats(self) -> Dict[str, Any]: """Get processing statistics.""" try: async with get_async_session() as session: # Get upload stats by status status_result = await session.execute( select(FileUpload.status, asyncio.func.count()) .group_by(FileUpload.status) ) status_stats = dict(status_result.fetchall()) # Get processing stats processed_result = await session.execute( select(asyncio.func.count()) .select_from(FileUpload) .where(FileUpload.processed == True) ) processed_count = processed_result.scalar() # Get failed stats failed_result = await session.execute( select(asyncio.func.count()) .select_from(FileUpload) .where(FileUpload.status == "failed") ) failed_count = failed_result.scalar() return { "status_stats": status_stats, "processed_count": processed_count, "failed_count": failed_count, "is_running": self.is_running, "active_tasks": len([t for t in self.tasks if not t.done()]), "temp_files": len(list(self.temp_dir.glob("*"))), "last_update": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Error getting processing stats: {e}") return {"error": str(e)} # Global converter instance convert_service = ConvertService()