""" Storage management routes with chunked uploads, download handling, and file operations. Provides secure file operations with progress tracking and comprehensive validation. """ import asyncio from datetime import datetime from typing import Dict, List, Optional, Any from uuid import UUID from sanic import Blueprint, Request, response from sanic.response import JSONResponse, ResponseStream from sqlalchemy import select, update from app.core.config import get_settings from app.core.database import get_async_session, get_cache_manager from app.core.logging import get_logger from app.core.storage import StorageManager from app.core.security import validate_file_signature, generate_secure_filename from app.api.middleware import require_auth, validate_request, rate_limit from app.core.validation import StorageUploadSchema, ChunkUploadSchema # Initialize blueprint storage_bp = Blueprint("storage", url_prefix="/api/v1/storage") logger = get_logger(__name__) settings = get_settings() @storage_bp.route("/upload", methods=["POST"]) @rate_limit(limit=10, window=3600) # 10 upload sessions per hour @require_auth(permissions=["storage.upload"]) @validate_request(StorageUploadSchema) async def initiate_upload(request: Request) -> JSONResponse: """ Initiate chunked file upload session with security validation. Args: request: Sanic request with upload parameters Returns: JSONResponse: Upload session information """ try: data = request.json user_id = request.ctx.user.id # Validate file size against user quota cache_manager = get_cache_manager() quota_key = f"user:{user_id}:storage_quota" current_usage = await cache_manager.get(quota_key, default=0) if current_usage + data["file_size"] > settings.MAX_STORAGE_PER_USER: return response.json( {"error": "Storage quota exceeded", "code": "QUOTA_EXCEEDED"}, status=429 ) # Generate secure filename secure_filename = generate_secure_filename(data["filename"], user_id) # Validate content type allowed_types = { 'image/jpeg', 'image/png', 'image/gif', 'image/webp', 'video/mp4', 'video/webm', 'video/avi', 'audio/mpeg', 'audio/wav', 'audio/flac', 'audio/ogg', 'application/pdf', 'text/plain', 'application/json', 'application/zip', 'application/x-rar' } if data["content_type"] not in allowed_types: return response.json( {"error": "File type not allowed", "code": "TYPE_NOT_ALLOWED"}, status=400 ) # Create content record first async with get_async_session() as session: from app.core.models.content import Content content = Content( user_id=user_id, title=secure_filename, content_type=data["content_type"], file_size=data["file_size"], status="uploading", visibility="private" ) session.add(content) await session.commit() await session.refresh(content) # Create upload session storage_manager = StorageManager() upload_session = await storage_manager.create_upload_session( content.id, data["file_size"] ) # Update user quota await cache_manager.increment(quota_key, data["file_size"], ttl=86400) await logger.ainfo( "Upload session initiated", user_id=str(user_id), content_id=str(content.id), filename=secure_filename, file_size=data["file_size"] ) return response.json({ "upload_session": upload_session, "content_id": str(content.id), "secure_filename": secure_filename, "status": "ready_for_upload" }, status=201) except Exception as e: await logger.aerror( "Failed to initiate upload", user_id=str(user_id), error=str(e) ) return response.json( {"error": "Failed to initiate upload", "code": "UPLOAD_INIT_FAILED"}, status=500 ) @storage_bp.route("/upload//chunk", methods=["POST"]) @rate_limit(limit=1000, window=3600) # 1000 chunks per hour @require_auth(permissions=["storage.upload"]) async def upload_chunk(request: Request, upload_id: UUID) -> JSONResponse: """ Upload individual file chunk with validation and progress tracking. Args: request: Sanic request with chunk data upload_id: Upload session UUID Returns: JSONResponse: Chunk upload status """ try: user_id = request.ctx.user.id # Get chunk data from form if 'chunk' not in request.files: return response.json( {"error": "No chunk data provided", "code": "NO_CHUNK_DATA"}, status=400 ) chunk_file = request.files['chunk'][0] chunk_data = chunk_file.body # Get chunk metadata chunk_index = int(request.form.get('chunk_index', 0)) chunk_hash = request.form.get('chunk_hash', '') is_final = request.form.get('is_final', 'false').lower() == 'true' if not chunk_hash: return response.json( {"error": "Chunk hash required", "code": "HASH_REQUIRED"}, status=400 ) # Validate chunk size if len(chunk_data) > settings.MAX_CHUNK_SIZE: return response.json( {"error": "Chunk too large", "code": "CHUNK_TOO_LARGE"}, status=400 ) # Upload chunk storage_manager = StorageManager() result = await storage_manager.upload_chunk( upload_id, chunk_index, chunk_data, chunk_hash ) # Check if upload is complete if is_final or result["uploaded_chunks"] == result["total_chunks"]: # Finalize upload finalize_result = await storage_manager.finalize_upload(upload_id) result.update(finalize_result) await logger.ainfo( "Upload completed", upload_id=str(upload_id), user_id=str(user_id), content_id=finalize_result.get("content_id") ) return response.json(result) except ValueError as e: await logger.awarning( "Chunk upload validation failed", upload_id=str(upload_id), user_id=str(user_id), error=str(e) ) return response.json( {"error": str(e), "code": "VALIDATION_FAILED"}, status=400 ) except Exception as e: await logger.aerror( "Chunk upload failed", upload_id=str(upload_id), user_id=str(user_id), error=str(e) ) return response.json( {"error": "Chunk upload failed", "code": "CHUNK_UPLOAD_FAILED"}, status=500 ) @storage_bp.route("/upload//status", methods=["GET"]) @rate_limit(limit=100, window=3600) # 100 status checks per hour @require_auth(permissions=["storage.upload"]) async def get_upload_status(request: Request, upload_id: UUID) -> JSONResponse: """ Get upload session status and progress. Args: request: Sanic request object upload_id: Upload session UUID Returns: JSONResponse: Upload progress information """ try: user_id = request.ctx.user.id storage_manager = StorageManager() # Get session data session_data = await storage_manager._get_upload_session(upload_id) if not session_data: return response.json( {"error": "Upload session not found", "code": "SESSION_NOT_FOUND"}, status=404 ) # Verify user ownership async with get_async_session() as session: from app.core.models.content import Content stmt = select(Content).where( Content.id == UUID(session_data["content_id"]) ) result = await session.execute(stmt) content = result.scalar_one_or_none() if not content or content.user_id != user_id: return response.json( {"error": "Access denied", "code": "ACCESS_DENIED"}, status=403 ) # Calculate progress uploaded_chunks = len(session_data.get("uploaded_chunks", [])) total_chunks = session_data["total_chunks"] progress_percent = (uploaded_chunks / total_chunks * 100) if total_chunks > 0 else 0 return response.json({ "upload_id": str(upload_id), "status": session_data["status"], "progress": { "uploaded_chunks": uploaded_chunks, "total_chunks": total_chunks, "percent": round(progress_percent, 2) }, "created_at": session_data["created_at"], "expires_at": session_data["expires_at"] }) except Exception as e: await logger.aerror( "Failed to get upload status", upload_id=str(upload_id), user_id=str(user_id), error=str(e) ) return response.json( {"error": "Failed to get upload status", "code": "STATUS_FAILED"}, status=500 ) @storage_bp.route("/upload/", methods=["DELETE"]) @rate_limit(limit=50, window=3600) # 50 cancellations per hour @require_auth(permissions=["storage.upload"]) async def cancel_upload(request: Request, upload_id: UUID) -> JSONResponse: """ Cancel upload session and clean up temporary files. Args: request: Sanic request object upload_id: Upload session UUID Returns: JSONResponse: Cancellation status """ try: user_id = request.ctx.user.id storage_manager = StorageManager() # Get session data session_data = await storage_manager._get_upload_session(upload_id) if not session_data: return response.json( {"error": "Upload session not found", "code": "SESSION_NOT_FOUND"}, status=404 ) # Verify user ownership content_id = UUID(session_data["content_id"]) async with get_async_session() as session: from app.core.models.content import Content stmt = select(Content).where(Content.id == content_id) result = await session.execute(stmt) content = result.scalar_one_or_none() if not content or content.user_id != user_id: return response.json( {"error": "Access denied", "code": "ACCESS_DENIED"}, status=403 ) # Delete content record await session.delete(content) await session.commit() # Clean up chunks and session cache_manager = get_cache_manager() session_key = f"upload_session:{upload_id}" await cache_manager.delete(session_key) # Clean up chunks from storage for chunk_index in session_data.get("uploaded_chunks", []): chunk_id = f"{upload_id}_{chunk_index:06d}" await storage_manager.backend.delete_chunk(chunk_id) # Update user quota quota_key = f"user:{user_id}:storage_quota" await cache_manager.decrement(quota_key, session_data.get("total_size", 0)) await logger.ainfo( "Upload cancelled", upload_id=str(upload_id), user_id=str(user_id), content_id=str(content_id) ) return response.json({ "status": "cancelled", "upload_id": str(upload_id) }) except Exception as e: await logger.aerror( "Failed to cancel upload", upload_id=str(upload_id), user_id=str(user_id), error=str(e) ) return response.json( {"error": "Failed to cancel upload", "code": "CANCEL_FAILED"}, status=500 ) @storage_bp.route("/files/", methods=["DELETE"]) @rate_limit(limit=50, window=3600) # 50 deletions per hour @require_auth(permissions=["storage.delete"]) async def delete_file(request: Request, content_id: UUID) -> JSONResponse: """ Delete content file and cleanup storage. Args: request: Sanic request object content_id: Content UUID to delete Returns: JSONResponse: Deletion status """ try: user_id = request.ctx.user.id async with get_async_session() as session: from app.core.models.content import Content # Get content stmt = select(Content).where(Content.id == content_id) result = await session.execute(stmt) content = result.scalar_one_or_none() if not content: return response.json( {"error": "Content not found", "code": "NOT_FOUND"}, status=404 ) # Check permissions if content.user_id != user_id and not request.ctx.user.is_admin: return response.json( {"error": "Access denied", "code": "ACCESS_DENIED"}, status=403 ) # Delete files storage_manager = StorageManager() deletion_success = await storage_manager.delete_content_files(content_id) if not deletion_success: await logger.awarning( "File deletion partially failed", content_id=str(content_id), user_id=str(user_id) ) # Update user quota cache_manager = get_cache_manager() quota_key = f"user:{user_id}:storage_quota" await cache_manager.decrement(quota_key, content.file_size or 0) # Clear caches await cache_manager.delete(f"content:{content_id}") await cache_manager.delete(f"content:{content_id}:full") await logger.ainfo( "Content deleted", content_id=str(content_id), user_id=str(user_id), file_size=content.file_size ) return response.json({ "status": "deleted", "content_id": str(content_id) }) except Exception as e: await logger.aerror( "Failed to delete content", content_id=str(content_id), user_id=str(user_id), error=str(e) ) return response.json( {"error": "Failed to delete content", "code": "DELETE_FAILED"}, status=500 ) @storage_bp.route("/quota", methods=["GET"]) @rate_limit(limit=100, window=3600) # 100 quota checks per hour @require_auth(permissions=["storage.read"]) async def get_storage_quota(request: Request) -> JSONResponse: """ Get user storage quota and usage information. Args: request: Sanic request object Returns: JSONResponse: Quota information """ try: user_id = request.ctx.user.id # Get current usage from cache cache_manager = get_cache_manager() quota_key = f"user:{user_id}:storage_quota" current_usage = await cache_manager.get(quota_key, default=0) # Calculate accurate usage from database async with get_async_session() as session: from sqlalchemy import func from app.core.models.content import Content stmt = select( func.count(Content.id).label('file_count'), func.sum(Content.file_size).label('total_size') ).where( Content.user_id == user_id, Content.status == 'completed' ) result = await session.execute(stmt) stats = result.first() accurate_usage = stats.total_size or 0 file_count = stats.file_count or 0 # Update cache with accurate value if abs(current_usage - accurate_usage) > 1024: # Update if difference > 1KB await cache_manager.set(quota_key, accurate_usage, ttl=86400) current_usage = accurate_usage # Calculate quota information max_quota = settings.MAX_STORAGE_PER_USER usage_percent = (current_usage / max_quota * 100) if max_quota > 0 else 0 return response.json({ "quota": { "used_bytes": current_usage, "max_bytes": max_quota, "available_bytes": max(0, max_quota - current_usage), "usage_percent": round(usage_percent, 2) }, "files": { "count": file_count, "max_files": settings.MAX_FILES_PER_USER }, "updated_at": datetime.utcnow().isoformat() }) except Exception as e: await logger.aerror( "Failed to get storage quota", user_id=str(user_id), error=str(e) ) return response.json( {"error": "Failed to get quota information", "code": "QUOTA_FAILED"}, status=500 ) @storage_bp.route("/stats", methods=["GET"]) @rate_limit(limit=50, window=3600) # 50 stats requests per hour @require_auth(permissions=["storage.read"]) async def get_storage_stats(request: Request) -> JSONResponse: """ Get detailed storage statistics for user. Args: request: Sanic request object Returns: JSONResponse: Detailed storage statistics """ try: user_id = request.ctx.user.id async with get_async_session() as session: from sqlalchemy import func from app.core.models.content import Content # Get statistics by content type type_stmt = select( Content.content_type, func.count(Content.id).label('count'), func.sum(Content.file_size).label('size'), func.avg(Content.file_size).label('avg_size') ).where( Content.user_id == user_id, Content.status == 'completed' ).group_by(Content.content_type) type_result = await session.execute(type_stmt) type_stats = { row.content_type: { 'count': row.count, 'total_size': row.size or 0, 'average_size': row.avg_size or 0 } for row in type_result } # Get upload statistics by month monthly_stmt = select( func.date_trunc('month', Content.created_at).label('month'), func.count(Content.id).label('uploads'), func.sum(Content.file_size).label('size') ).where( Content.user_id == user_id, Content.status == 'completed', Content.created_at >= datetime.utcnow().replace(day=1) - timedelta(days=365) ).group_by(func.date_trunc('month', Content.created_at)) monthly_result = await session.execute(monthly_stmt) monthly_stats = [ { 'month': row.month.isoformat(), 'uploads': row.uploads, 'size': row.size or 0 } for row in monthly_result ] return response.json({ "by_type": type_stats, "monthly": monthly_stats, "generated_at": datetime.utcnow().isoformat() }) except Exception as e: await logger.aerror( "Failed to get storage stats", user_id=str(user_id), error=str(e) ) return response.json( {"error": "Failed to get storage statistics", "code": "STATS_FAILED"}, status=500 ) @storage_bp.route("/cleanup", methods=["POST"]) @rate_limit(limit=5, window=3600) # 5 cleanup operations per hour @require_auth(permissions=["storage.admin"]) async def cleanup_orphaned_files(request: Request) -> JSONResponse: """ Clean up orphaned files and incomplete uploads (admin only). Args: request: Sanic request object Returns: JSONResponse: Cleanup results """ try: if not request.ctx.user.is_admin: return response.json( {"error": "Admin access required", "code": "ADMIN_REQUIRED"}, status=403 ) storage_manager = StorageManager() cache_manager = get_cache_manager() cleanup_stats = { "orphaned_chunks": 0, "expired_sessions": 0, "failed_uploads": 0, "freed_space": 0 } # Clean up expired upload sessions async with get_async_session() as session: from app.core.models.storage import ContentUploadSession from app.core.models.content import Content # Get expired sessions expired_sessions_stmt = select(ContentUploadSession).where( ContentUploadSession.expires_at < datetime.utcnow() ) expired_result = await session.execute(expired_sessions_stmt) expired_sessions = expired_result.scalars().all() for upload_session in expired_sessions: # Clean up chunks session_key = f"upload_session:{upload_session.id}" session_data = await cache_manager.get(session_key) if session_data: for chunk_index in session_data.get("uploaded_chunks", []): chunk_id = f"{upload_session.id}_{chunk_index:06d}" if await storage_manager.backend.delete_chunk(chunk_id): cleanup_stats["orphaned_chunks"] += 1 # Delete session await session.delete(upload_session) await cache_manager.delete(session_key) cleanup_stats["expired_sessions"] += 1 # Clean up failed uploads (older than 24 hours) failed_uploads_stmt = select(Content).where( Content.status.in_(['uploading', 'processing', 'failed']), Content.created_at < datetime.utcnow() - timedelta(hours=24) ) failed_result = await session.execute(failed_uploads_stmt) failed_uploads = failed_result.scalars().all() for content in failed_uploads: if content.file_path: if await storage_manager.backend.delete_file(content.file_path): cleanup_stats["freed_space"] += content.file_size or 0 await session.delete(content) cleanup_stats["failed_uploads"] += 1 await session.commit() await logger.ainfo( "Storage cleanup completed", **cleanup_stats, admin_user=str(request.ctx.user.id) ) return response.json({ "status": "cleanup_completed", "results": cleanup_stats, "timestamp": datetime.utcnow().isoformat() }) except Exception as e: await logger.aerror( "Storage cleanup failed", admin_user=str(request.ctx.user.id), error=str(e) ) return response.json( {"error": "Cleanup operation failed", "code": "CLEANUP_FAILED"}, status=500 )