uploader-bot/app/core/background/convert_service.py

603 lines
23 KiB
Python

"""Media conversion service for processing uploaded files."""
import asyncio
import hashlib
import json
import logging
import os
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Set, Any, Tuple
import aiofiles
import redis.asyncio as redis
from PIL import Image, ImageOps
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import get_settings
from app.core.database import get_async_session
from app.core.models.content import Content, FileUpload
from app.core.storage import storage_manager
logger = logging.getLogger(__name__)
class ConvertService:
"""Service for converting and processing uploaded media files."""
def __init__(self):
self.settings = get_settings()
self.redis_client: Optional[redis.Redis] = None
self.is_running = False
self.tasks: Set[asyncio.Task] = set()
# Conversion configuration
self.batch_size = 10
self.process_interval = 5 # seconds
self.max_retries = 3
self.temp_dir = Path(tempfile.gettempdir()) / "uploader_convert"
self.temp_dir.mkdir(exist_ok=True)
# Supported formats
self.image_formats = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.tiff'}
self.video_formats = {'.mp4', '.avi', '.mov', '.wmv', '.flv', '.webm', '.mkv'}
self.audio_formats = {'.mp3', '.wav', '.ogg', '.m4a', '.flac', '.aac'}
self.document_formats = {'.pdf', '.doc', '.docx', '.txt', '.rtf'}
# Image processing settings
self.thumbnail_sizes = [(150, 150), (300, 300), (800, 600)]
self.image_quality = 85
self.max_image_size = (2048, 2048)
async def start(self) -> None:
"""Start the conversion service."""
try:
logger.info("Starting media conversion service")
# Initialize Redis connection
self.redis_client = redis.from_url(
self.settings.redis_url,
encoding="utf-8",
decode_responses=True,
socket_keepalive=True,
socket_keepalive_options={},
health_check_interval=30,
)
# Test Redis connection
await self.redis_client.ping()
logger.info("Redis connection established for converter")
# Start conversion tasks
self.is_running = True
# Create conversion tasks
tasks = [
asyncio.create_task(self._process_pending_files_loop()),
asyncio.create_task(self._cleanup_temp_files_loop()),
asyncio.create_task(self._retry_failed_conversions_loop()),
]
self.tasks.update(tasks)
# Wait for all tasks
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
logger.error(f"Error starting conversion service: {e}")
await self.stop()
raise
async def stop(self) -> None:
"""Stop the conversion service."""
logger.info("Stopping media conversion service")
self.is_running = False
# Cancel all tasks
for task in self.tasks:
if not task.done():
task.cancel()
# Wait for tasks to complete
if self.tasks:
await asyncio.gather(*self.tasks, return_exceptions=True)
# Close Redis connection
if self.redis_client:
await self.redis_client.close()
# Cleanup temp directory
await self._cleanup_temp_directory()
logger.info("Conversion service stopped")
async def _process_pending_files_loop(self) -> None:
"""Main loop for processing pending file conversions."""
logger.info("Starting file conversion loop")
while self.is_running:
try:
await self._process_pending_files()
await asyncio.sleep(self.process_interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in file conversion loop: {e}")
await asyncio.sleep(self.process_interval)
async def _process_pending_files(self) -> None:
"""Process pending file conversions."""
async with get_async_session() as session:
try:
# Get pending uploads
result = await session.execute(
select(FileUpload)
.where(
FileUpload.status == "uploaded",
FileUpload.processed == False
)
.limit(self.batch_size)
)
uploads = result.scalars().all()
if not uploads:
return
logger.info(f"Processing {len(uploads)} pending files")
# Process each upload
for upload in uploads:
await self._process_single_file(session, upload)
await session.commit()
except Exception as e:
logger.error(f"Error processing pending files: {e}")
await session.rollback()
async def _process_single_file(self, session: AsyncSession, upload: FileUpload) -> None:
"""Process a single file upload."""
try:
logger.info(f"Processing file: {upload.filename}")
# Mark as processing
upload.status = "processing"
upload.processing_started_at = datetime.utcnow()
await session.commit()
# Get file extension
file_ext = Path(upload.filename).suffix.lower()
# Process based on file type
if file_ext in self.image_formats:
await self._process_image(session, upload)
elif file_ext in self.video_formats:
await self._process_video(session, upload)
elif file_ext in self.audio_formats:
await self._process_audio(session, upload)
elif file_ext in self.document_formats:
await self._process_document(session, upload)
else:
# Just mark as processed for unsupported formats
upload.status = "completed"
upload.processed = True
upload.processing_completed_at = datetime.utcnow()
# Cache processing result
cache_key = f"processed:{upload.id}"
processing_info = {
"status": upload.status,
"processed_at": datetime.utcnow().isoformat(),
"metadata": upload.metadata or {}
}
await self.redis_client.setex(cache_key, 3600, json.dumps(processing_info))
except Exception as e:
logger.error(f"Error processing file {upload.filename}: {e}")
# Mark as failed
upload.status = "failed"
upload.error_message = str(e)
upload.retry_count = (upload.retry_count or 0) + 1
if upload.retry_count >= self.max_retries:
upload.processed = True # Stop retrying
async def _process_image(self, session: AsyncSession, upload: FileUpload) -> None:
"""Process an image file."""
try:
# Download original file
original_path = await self._download_file(upload)
if not original_path:
raise Exception("Failed to download original file")
# Open image
with Image.open(original_path) as img:
# Extract metadata
metadata = {
"format": img.format,
"mode": img.mode,
"size": img.size,
"has_transparency": img.mode in ('RGBA', 'LA') or 'transparency' in img.info
}
# Fix orientation
img = ImageOps.exif_transpose(img)
# Resize if too large
if img.size[0] > self.max_image_size[0] or img.size[1] > self.max_image_size[1]:
img.thumbnail(self.max_image_size, Image.Resampling.LANCZOS)
metadata["resized"] = True
# Save optimized version
optimized_path = self.temp_dir / f"optimized_{upload.id}.jpg"
# Convert to RGB if necessary
if img.mode in ('RGBA', 'LA'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'LA':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[-1])
img = background
elif img.mode != 'RGB':
img = img.convert('RGB')
img.save(
optimized_path,
'JPEG',
quality=self.image_quality,
optimize=True
)
# Upload optimized version
optimized_url = await storage_manager.upload_file(
str(optimized_path),
f"optimized/{upload.id}/image.jpg"
)
# Generate thumbnails
thumbnails = {}
for size in self.thumbnail_sizes:
thumbnail_path = await self._create_thumbnail(original_path, size)
if thumbnail_path:
thumb_url = await storage_manager.upload_file(
str(thumbnail_path),
f"thumbnails/{upload.id}/{size[0]}x{size[1]}.jpg"
)
thumbnails[f"{size[0]}x{size[1]}"] = thumb_url
thumbnail_path.unlink() # Cleanup
# Update upload record
upload.metadata = {
**metadata,
"thumbnails": thumbnails,
"optimized_url": optimized_url
}
upload.status = "completed"
upload.processed = True
upload.processing_completed_at = datetime.utcnow()
# Cleanup temp files
original_path.unlink()
optimized_path.unlink()
except Exception as e:
logger.error(f"Error processing image {upload.filename}: {e}")
raise
async def _process_video(self, session: AsyncSession, upload: FileUpload) -> None:
"""Process a video file."""
try:
# For video processing, we would typically use ffmpeg
# This is a simplified version that just extracts basic info
original_path = await self._download_file(upload)
if not original_path:
raise Exception("Failed to download original file")
# Basic video metadata (would use ffprobe in real implementation)
metadata = {
"type": "video",
"file_size": original_path.stat().st_size,
"processing_note": "Video processing requires ffmpeg implementation"
}
# Generate video thumbnail (simplified)
thumbnail_path = await self._create_video_thumbnail(original_path)
if thumbnail_path:
thumb_url = await storage_manager.upload_file(
str(thumbnail_path),
f"thumbnails/{upload.id}/video_thumb.jpg"
)
metadata["thumbnail"] = thumb_url
thumbnail_path.unlink()
# Update upload record
upload.metadata = metadata
upload.status = "completed"
upload.processed = True
upload.processing_completed_at = datetime.utcnow()
# Cleanup
original_path.unlink()
except Exception as e:
logger.error(f"Error processing video {upload.filename}: {e}")
raise
async def _process_audio(self, session: AsyncSession, upload: FileUpload) -> None:
"""Process an audio file."""
try:
original_path = await self._download_file(upload)
if not original_path:
raise Exception("Failed to download original file")
# Basic audio metadata
metadata = {
"type": "audio",
"file_size": original_path.stat().st_size,
"processing_note": "Audio processing requires additional libraries"
}
# Update upload record
upload.metadata = metadata
upload.status = "completed"
upload.processed = True
upload.processing_completed_at = datetime.utcnow()
# Cleanup
original_path.unlink()
except Exception as e:
logger.error(f"Error processing audio {upload.filename}: {e}")
raise
async def _process_document(self, session: AsyncSession, upload: FileUpload) -> None:
"""Process a document file."""
try:
original_path = await self._download_file(upload)
if not original_path:
raise Exception("Failed to download original file")
# Basic document metadata
metadata = {
"type": "document",
"file_size": original_path.stat().st_size,
"pages": 1, # Would extract actual page count for PDFs
"processing_note": "Document processing requires additional libraries"
}
# Update upload record
upload.metadata = metadata
upload.status = "completed"
upload.processed = True
upload.processing_completed_at = datetime.utcnow()
# Cleanup
original_path.unlink()
except Exception as e:
logger.error(f"Error processing document {upload.filename}: {e}")
raise
async def _download_file(self, upload: FileUpload) -> Optional[Path]:
"""Download a file for processing."""
try:
if not upload.file_path:
return None
# Create temp file path
temp_path = self.temp_dir / f"original_{upload.id}_{upload.filename}"
# Download file from storage
file_data = await storage_manager.get_file(upload.file_path)
if not file_data:
return None
# Write to temp file
async with aiofiles.open(temp_path, 'wb') as f:
await f.write(file_data)
return temp_path
except Exception as e:
logger.error(f"Error downloading file {upload.filename}: {e}")
return None
async def _create_thumbnail(self, image_path: Path, size: Tuple[int, int]) -> Optional[Path]:
"""Create a thumbnail from an image."""
try:
thumbnail_path = self.temp_dir / f"thumb_{size[0]}x{size[1]}_{image_path.name}"
with Image.open(image_path) as img:
# Fix orientation
img = ImageOps.exif_transpose(img)
# Create thumbnail
img.thumbnail(size, Image.Resampling.LANCZOS)
# Convert to RGB if necessary
if img.mode in ('RGBA', 'LA'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'LA':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[-1])
img = background
elif img.mode != 'RGB':
img = img.convert('RGB')
# Save thumbnail
img.save(
thumbnail_path,
'JPEG',
quality=self.image_quality,
optimize=True
)
return thumbnail_path
except Exception as e:
logger.error(f"Error creating thumbnail: {e}")
return None
async def _create_video_thumbnail(self, video_path: Path) -> Optional[Path]:
"""Create a thumbnail from a video file."""
try:
# This would require ffmpeg to extract a frame from the video
# For now, return a placeholder
return None
except Exception as e:
logger.error(f"Error creating video thumbnail: {e}")
return None
async def _cleanup_temp_files_loop(self) -> None:
"""Loop for cleaning up temporary files."""
logger.info("Starting temp file cleanup loop")
while self.is_running:
try:
await self._cleanup_old_temp_files()
await asyncio.sleep(3600) # Run every hour
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in temp cleanup loop: {e}")
await asyncio.sleep(3600)
async def _cleanup_old_temp_files(self) -> None:
"""Clean up old temporary files."""
try:
current_time = datetime.now().timestamp()
for file_path in self.temp_dir.glob("*"):
if file_path.is_file():
# Remove files older than 1 hour
if current_time - file_path.stat().st_mtime > 3600:
file_path.unlink()
logger.debug(f"Removed old temp file: {file_path}")
except Exception as e:
logger.error(f"Error cleaning up temp files: {e}")
async def _cleanup_temp_directory(self) -> None:
"""Clean up the entire temp directory."""
try:
for file_path in self.temp_dir.glob("*"):
if file_path.is_file():
file_path.unlink()
except Exception as e:
logger.error(f"Error cleaning up temp directory: {e}")
async def _retry_failed_conversions_loop(self) -> None:
"""Loop for retrying failed conversions."""
logger.info("Starting retry loop for failed conversions")
while self.is_running:
try:
await self._retry_failed_conversions()
await asyncio.sleep(1800) # Run every 30 minutes
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in retry loop: {e}")
await asyncio.sleep(1800)
async def _retry_failed_conversions(self) -> None:
"""Retry failed conversions that haven't exceeded max retries."""
async with get_async_session() as session:
try:
# Get failed uploads that can be retried
result = await session.execute(
select(FileUpload)
.where(
FileUpload.status == "failed",
FileUpload.processed == False,
(FileUpload.retry_count < self.max_retries) | (FileUpload.retry_count.is_(None))
)
.limit(5) # Smaller batch for retries
)
uploads = result.scalars().all()
for upload in uploads:
logger.info(f"Retrying failed conversion for: {upload.filename}")
# Reset status
upload.status = "uploaded"
upload.error_message = None
# Process the file
await self._process_single_file(session, upload)
await session.commit()
except Exception as e:
logger.error(f"Error retrying failed conversions: {e}")
await session.rollback()
async def queue_file_for_processing(self, upload_id: str) -> bool:
"""Queue a file for processing."""
try:
# Add to processing queue
queue_key = "conversion_queue"
await self.redis_client.lpush(queue_key, upload_id)
logger.info(f"Queued file {upload_id} for processing")
return True
except Exception as e:
logger.error(f"Error queuing file for processing: {e}")
return False
async def get_processing_stats(self) -> Dict[str, Any]:
"""Get processing statistics."""
try:
async with get_async_session() as session:
# Get upload stats by status
status_result = await session.execute(
select(FileUpload.status, asyncio.func.count())
.group_by(FileUpload.status)
)
status_stats = dict(status_result.fetchall())
# Get processing stats
processed_result = await session.execute(
select(asyncio.func.count())
.select_from(FileUpload)
.where(FileUpload.processed == True)
)
processed_count = processed_result.scalar()
# Get failed stats
failed_result = await session.execute(
select(asyncio.func.count())
.select_from(FileUpload)
.where(FileUpload.status == "failed")
)
failed_count = failed_result.scalar()
return {
"status_stats": status_stats,
"processed_count": processed_count,
"failed_count": failed_count,
"is_running": self.is_running,
"active_tasks": len([t for t in self.tasks if not t.done()]),
"temp_files": len(list(self.temp_dir.glob("*"))),
"last_update": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Error getting processing stats: {e}")
return {"error": str(e)}
# Global converter instance
convert_service = ConvertService()