uploader-bot/app/core/storage.py

575 lines
22 KiB
Python

"""
Comprehensive storage management with chunked uploads, multiple backends, and security.
Supports local storage, S3-compatible storage, and async operations with Redis caching.
"""
import asyncio
import hashlib
import mimetypes
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, AsyncGenerator, Any, Tuple
from uuid import UUID, uuid4
import aiofiles
import aiofiles.os
from sqlalchemy import select, update
from sqlalchemy.orm import selectinload
from app.core.config import get_settings
from app.core.database import get_async_session, get_cache_manager
from app.core.logging import get_logger
from app.core.models.content import Content, ContentChunk
from app.core.security import encrypt_file, decrypt_file, generate_access_token
logger = get_logger(__name__)
settings = get_settings()
class StorageBackend:
"""Abstract base class for storage backends."""
async def store_chunk(self, upload_id: UUID, chunk_index: int, data: bytes) -> str:
"""Store a file chunk and return its identifier."""
raise NotImplementedError
async def retrieve_chunk(self, chunk_id: str) -> bytes:
"""Retrieve a file chunk by its identifier."""
raise NotImplementedError
async def delete_chunk(self, chunk_id: str) -> bool:
"""Delete a file chunk."""
raise NotImplementedError
async def assemble_file(self, upload_id: UUID, chunks: List[str]) -> str:
"""Assemble chunks into final file and return file path."""
raise NotImplementedError
async def delete_file(self, file_path: str) -> bool:
"""Delete a complete file."""
raise NotImplementedError
async def get_file_stream(self, file_path: str) -> AsyncGenerator[bytes, None]:
"""Get async file stream for download."""
raise NotImplementedError
class LocalStorageBackend(StorageBackend):
"""Local filesystem storage backend with encryption support."""
def __init__(self):
self.base_path = Path(settings.STORAGE_PATH)
self.chunks_path = self.base_path / "chunks"
self.files_path = self.base_path / "files"
# Create directories if they don't exist
self.chunks_path.mkdir(parents=True, exist_ok=True)
self.files_path.mkdir(parents=True, exist_ok=True)
async def store_chunk(self, upload_id: UUID, chunk_index: int, data: bytes) -> str:
"""Store chunk to local filesystem with optional encryption."""
try:
chunk_id = f"{upload_id}_{chunk_index:06d}"
chunk_path = self.chunks_path / f"{chunk_id}.chunk"
# Encrypt chunk if encryption is enabled
if settings.ENCRYPT_FILES:
data = encrypt_file(data, str(upload_id))
async with aiofiles.open(chunk_path, 'wb') as f:
await f.write(data)
await logger.adebug(
"Chunk stored successfully",
upload_id=str(upload_id),
chunk_index=chunk_index,
chunk_size=len(data)
)
return chunk_id
except Exception as e:
await logger.aerror(
"Failed to store chunk",
upload_id=str(upload_id),
chunk_index=chunk_index,
error=str(e)
)
raise
async def retrieve_chunk(self, chunk_id: str) -> bytes:
"""Retrieve and optionally decrypt chunk from local filesystem."""
try:
chunk_path = self.chunks_path / f"{chunk_id}.chunk"
if not chunk_path.exists():
raise FileNotFoundError(f"Chunk {chunk_id} not found")
async with aiofiles.open(chunk_path, 'rb') as f:
data = await f.read()
# Decrypt chunk if encryption is enabled
if settings.ENCRYPT_FILES:
upload_id = chunk_id.split('_')[0]
data = decrypt_file(data, upload_id)
return data
except Exception as e:
await logger.aerror("Failed to retrieve chunk", chunk_id=chunk_id, error=str(e))
raise
async def delete_chunk(self, chunk_id: str) -> bool:
"""Delete chunk file from local filesystem."""
try:
chunk_path = self.chunks_path / f"{chunk_id}.chunk"
if chunk_path.exists():
await aiofiles.os.remove(chunk_path)
return True
return False
except Exception as e:
await logger.aerror("Failed to delete chunk", chunk_id=chunk_id, error=str(e))
return False
async def assemble_file(self, upload_id: UUID, chunks: List[str]) -> str:
"""Assemble chunks into final file."""
try:
file_id = str(uuid4())
file_path = self.files_path / f"{file_id}"
async with aiofiles.open(file_path, 'wb') as output_file:
for chunk_id in chunks:
chunk_data = await self.retrieve_chunk(chunk_id)
await output_file.write(chunk_data)
# Clean up chunks after assembly
for chunk_id in chunks:
await self.delete_chunk(chunk_id)
await logger.ainfo(
"File assembled successfully",
upload_id=str(upload_id),
file_path=str(file_path),
chunks_count=len(chunks)
)
return str(file_path)
except Exception as e:
await logger.aerror(
"Failed to assemble file",
upload_id=str(upload_id),
error=str(e)
)
raise
async def delete_file(self, file_path: str) -> bool:
"""Delete file from local filesystem."""
try:
path = Path(file_path)
if path.exists() and path.is_file():
await aiofiles.os.remove(path)
return True
return False
except Exception as e:
await logger.aerror("Failed to delete file", file_path=file_path, error=str(e))
return False
async def get_file_stream(self, file_path: str) -> AsyncGenerator[bytes, None]:
"""Stream file content for download."""
try:
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File {file_path} not found")
async with aiofiles.open(path, 'rb') as f:
while True:
chunk = await f.read(65536) # 64KB chunks
if not chunk:
break
yield chunk
except Exception as e:
await logger.aerror("Failed to stream file", file_path=file_path, error=str(e))
raise
class StorageManager:
"""Main storage manager with upload session management and caching."""
def __init__(self):
self.backend = LocalStorageBackend() # Can be extended to support S3, etc.
self.cache_manager = get_cache_manager()
async def create_upload_session(self, content_id: UUID, total_size: int) -> Dict[str, Any]:
"""Create new upload session with chunked upload support."""
try:
upload_id = uuid4()
session_data = {
"upload_id": str(upload_id),
"content_id": str(content_id),
"total_size": total_size,
"chunk_size": settings.CHUNK_SIZE,
"total_chunks": (total_size + settings.CHUNK_SIZE - 1) // settings.CHUNK_SIZE,
"uploaded_chunks": [],
"created_at": datetime.utcnow().isoformat(),
"expires_at": (datetime.utcnow() + timedelta(hours=24)).isoformat(),
"status": "active"
}
# Store session in cache
session_key = f"upload_session:{upload_id}"
await self.cache_manager.set(session_key, session_data, ttl=86400) # 24 hours
# Store in database for persistence
async with get_async_session() as session:
upload_session = ContentUploadSession(
id=upload_id,
content_id=content_id,
total_size=total_size,
chunk_size=settings.CHUNK_SIZE,
total_chunks=session_data["total_chunks"],
expires_at=datetime.fromisoformat(session_data["expires_at"])
)
session.add(upload_session)
await session.commit()
await logger.ainfo(
"Upload session created",
upload_id=str(upload_id),
content_id=str(content_id),
total_size=total_size
)
return {
"upload_id": str(upload_id),
"chunk_size": settings.CHUNK_SIZE,
"total_chunks": session_data["total_chunks"],
"upload_url": f"/api/v1/storage/upload/{upload_id}",
"expires_at": session_data["expires_at"]
}
except Exception as e:
await logger.aerror(
"Failed to create upload session",
content_id=str(content_id),
error=str(e)
)
raise
async def upload_chunk(
self,
upload_id: UUID,
chunk_index: int,
chunk_data: bytes,
chunk_hash: str
) -> Dict[str, Any]:
"""Upload and validate a file chunk."""
try:
# Verify chunk hash
calculated_hash = hashlib.sha256(chunk_data).hexdigest()
if calculated_hash != chunk_hash:
raise ValueError("Chunk hash mismatch")
# Get upload session
session_data = await self._get_upload_session(upload_id)
if not session_data:
raise ValueError("Upload session not found or expired")
# Check if chunk already uploaded
if chunk_index in session_data.get("uploaded_chunks", []):
return {"status": "already_uploaded", "chunk_index": chunk_index}
# Store chunk
chunk_id = await self.backend.store_chunk(upload_id, chunk_index, chunk_data)
# Update session data
session_data["uploaded_chunks"].append(chunk_index)
session_data["uploaded_chunks"].sort()
session_key = f"upload_session:{upload_id}"
await self.cache_manager.set(session_key, session_data, ttl=86400)
# Store chunk info in database
async with get_async_session() as session:
chunk_record = ContentChunk(
upload_id=upload_id,
chunk_index=chunk_index,
chunk_id=chunk_id,
chunk_hash=chunk_hash,
chunk_size=len(chunk_data)
)
session.add(chunk_record)
await session.commit()
await logger.adebug(
"Chunk uploaded successfully",
upload_id=str(upload_id),
chunk_index=chunk_index,
chunk_size=len(chunk_data)
)
return {
"status": "uploaded",
"chunk_index": chunk_index,
"uploaded_chunks": len(session_data["uploaded_chunks"]),
"total_chunks": session_data["total_chunks"]
}
except Exception as e:
await logger.aerror(
"Failed to upload chunk",
upload_id=str(upload_id),
chunk_index=chunk_index,
error=str(e)
)
raise
async def finalize_upload(self, upload_id: UUID) -> Dict[str, Any]:
"""Finalize upload by assembling chunks into final file."""
try:
# Get upload session
session_data = await self._get_upload_session(upload_id)
if not session_data:
raise ValueError("Upload session not found")
# Verify all chunks are uploaded
uploaded_chunks = session_data.get("uploaded_chunks", [])
total_chunks = session_data["total_chunks"]
if len(uploaded_chunks) != total_chunks:
missing_chunks = set(range(total_chunks)) - set(uploaded_chunks)
raise ValueError(f"Missing chunks: {missing_chunks}")
# Get chunk IDs in order
async with get_async_session() as session:
stmt = (
select(ContentChunk)
.where(ContentChunk.upload_id == upload_id)
.order_by(ContentChunk.chunk_index)
)
result = await session.execute(stmt)
chunks = result.scalars().all()
chunk_ids = [chunk.chunk_id for chunk in chunks]
# Assemble file
file_path = await self.backend.assemble_file(upload_id, chunk_ids)
# Update content record
async with get_async_session() as session:
stmt = (
update(Content)
.where(Content.id == UUID(session_data["content_id"]))
.values(
file_path=file_path,
status="completed",
updated_at=datetime.utcnow()
)
)
await session.execute(stmt)
await session.commit()
# Clean up session
session_key = f"upload_session:{upload_id}"
await self.cache_manager.delete(session_key)
await logger.ainfo(
"Upload finalized successfully",
upload_id=str(upload_id),
file_path=file_path,
total_chunks=total_chunks
)
return {
"status": "completed",
"file_path": file_path,
"content_id": session_data["content_id"]
}
except Exception as e:
await logger.aerror(
"Failed to finalize upload",
upload_id=str(upload_id),
error=str(e)
)
raise
async def get_file_stream(self, file_path: str) -> AsyncGenerator[bytes, None]:
"""Get file stream for download with caching support."""
try:
# Check if file is cached
cache_key = f"file_stream:{hashlib.md5(file_path.encode()).hexdigest()}"
async for chunk in self.backend.get_file_stream(file_path):
yield chunk
except Exception as e:
await logger.aerror("Failed to get file stream", file_path=file_path, error=str(e))
raise
async def delete_content_files(self, content_id: UUID) -> bool:
"""Delete all files associated with content."""
try:
async with get_async_session() as session:
# Get content
stmt = select(Content).where(Content.id == content_id)
result = await session.execute(stmt)
content = result.scalar_one_or_none()
if not content or not content.file_path:
return True
# Delete main file
await self.backend.delete_file(content.file_path)
# Delete any remaining chunks
chunk_stmt = select(ContentChunk).where(
ContentChunk.upload_id == content_id
)
chunk_result = await session.execute(chunk_stmt)
chunks = chunk_result.scalars().all()
for chunk in chunks:
await self.backend.delete_chunk(chunk.chunk_id)
# Update content record
update_stmt = (
update(Content)
.where(Content.id == content_id)
.values(file_path=None, status="deleted")
)
await session.execute(update_stmt)
await session.commit()
await logger.ainfo(
"Content files deleted",
content_id=str(content_id)
)
return True
except Exception as e:
await logger.aerror(
"Failed to delete content files",
content_id=str(content_id),
error=str(e)
)
return False
async def get_storage_stats(self) -> Dict[str, Any]:
"""Get storage usage statistics."""
try:
async with get_async_session() as session:
# Get total files and size
from sqlalchemy import func
stmt = select(
func.count(Content.id).label('total_files'),
func.sum(Content.file_size).label('total_size')
).where(Content.status == 'completed')
result = await session.execute(stmt)
stats = result.first()
# Get storage by type
type_stmt = select(
Content.content_type,
func.count(Content.id).label('count'),
func.sum(Content.file_size).label('size')
).where(Content.status == 'completed').group_by(Content.content_type)
type_result = await session.execute(type_stmt)
type_stats = {
row.content_type: {
'count': row.count,
'size': row.size or 0
}
for row in type_result
}
return {
'total_files': stats.total_files or 0,
'total_size': stats.total_size or 0,
'by_type': type_stats,
'updated_at': datetime.utcnow().isoformat()
}
except Exception as e:
await logger.aerror("Failed to get storage stats", error=str(e))
return {}
async def _get_upload_session(self, upload_id: UUID) -> Optional[Dict[str, Any]]:
"""Get upload session from cache or database."""
# Try cache first
session_key = f"upload_session:{upload_id}"
session_data = await self.cache_manager.get(session_key)
if session_data:
# Check if session is expired
expires_at = datetime.fromisoformat(session_data["expires_at"])
if expires_at > datetime.utcnow():
return session_data
# Fallback to database
try:
async with get_async_session() as session:
stmt = (
select(ContentUploadSession)
.where(ContentUploadSession.id == upload_id)
)
result = await session.execute(stmt)
upload_session = result.scalar_one_or_none()
if upload_session and upload_session.expires_at > datetime.utcnow():
# Rebuild session data
chunk_stmt = select(ContentChunk).where(
ContentChunk.upload_id == upload_id
)
chunk_result = await session.execute(chunk_stmt)
chunks = chunk_result.scalars().all()
session_data = {
"upload_id": str(upload_session.id),
"content_id": str(upload_session.content_id),
"total_size": upload_session.total_size,
"chunk_size": upload_session.chunk_size,
"total_chunks": upload_session.total_chunks,
"uploaded_chunks": [chunk.chunk_index for chunk in chunks],
"created_at": upload_session.created_at.isoformat(),
"expires_at": upload_session.expires_at.isoformat(),
"status": "active"
}
# Update cache
await self.cache_manager.set(session_key, session_data, ttl=86400)
return session_data
except Exception as e:
await logger.aerror(
"Failed to get upload session from database",
upload_id=str(upload_id),
error=str(e)
)
return None
# Additional model for upload sessions
from app.core.models.base import Base
from sqlalchemy import Column, Integer, DateTime
class ContentUploadSession(Base):
"""Model for tracking upload sessions."""
__tablename__ = "content_upload_sessions"
content_id = Column("content_id", sa.UUID(as_uuid=True), nullable=False)
total_size = Column(Integer, nullable=False)
chunk_size = Column(Integer, nullable=False, default=1048576) # 1MB
total_chunks = Column(Integer, nullable=False)
expires_at = Column(DateTime, nullable=False)
completed_at = Column(DateTime, nullable=True)