uploader-bot/app/api/fastapi_storage_routes.py

478 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
FastAPI маршруты для загрузки файлов с поддержкой chunked uploads
Критически важные эндпоинты для web2-client совместимости
"""
import asyncio
import base64
import hashlib
from datetime import datetime
from typing import Dict, List, Optional, Any
from uuid import UUID, uuid4
from fastapi import APIRouter, HTTPException, Request, Depends, UploadFile, File, Header
from fastapi.responses import JSONResponse, StreamingResponse
from sqlalchemy import select, update, delete
from app.core.config import get_settings
from app.core.database import db_manager, get_cache_manager
from app.core.logging import get_logger
from app.core.models.content_models import StoredContent as Content
from app.core.models.user import User
from app.api.fastapi_middleware import get_current_user, require_auth
# Initialize router
router = APIRouter(prefix="", tags=["storage"])
logger = get_logger(__name__)
settings = get_settings()
# Конфигурация
MAX_CHUNK_SIZE = 80 * 1024 * 1024 # 80 MB
STORAGE_API_URL = getattr(settings, 'STORAGE_API_URL', '/api/storage')
@router.post("")
async def chunked_file_upload(
request: Request,
file: bytes = File(...),
x_file_name: Optional[str] = Header(None, alias="X-File-Name"),
x_chunk_start: Optional[str] = Header(None, alias="X-Chunk-Start"),
x_last_chunk: Optional[str] = Header(None, alias="X-Last-Chunk"),
x_upload_id: Optional[str] = Header(None, alias="X-Upload-ID"),
content_type: Optional[str] = Header(None, alias="Content-Type"),
current_user: User = Depends(get_current_user)
):
"""
Chunked file upload совместимый с web2-client
Обрабатывает как обычные загрузки (≤80MB), так и чанкированные
Заголовки:
- X-File-Name: base64 encoded filename
- X-Chunk-Start: начальная позиция чанка
- X-Last-Chunk: "1" если это последний чанк
- X-Upload-ID: ID сессии загрузки (для чанков после первого)
- Content-Type: тип контента
"""
try:
# Проверка авторизации
auth_token = request.headers.get('authorization')
if not auth_token and not current_user:
raise HTTPException(status_code=401, detail="Authentication required")
# Валидация заголовков
if not x_file_name:
raise HTTPException(status_code=400, detail="X-File-Name header required")
if not x_chunk_start:
raise HTTPException(status_code=400, detail="X-Chunk-Start header required")
# Декодирование имени файла
try:
filename = base64.b64decode(x_file_name).decode('utf-8')
except Exception:
raise HTTPException(status_code=400, detail="Invalid X-File-Name encoding")
# Парсинг параметров
chunk_start = int(x_chunk_start)
is_last_chunk = x_last_chunk == "1"
upload_id = x_upload_id
# Валидация размера чанка
if len(file) > MAX_CHUNK_SIZE:
raise HTTPException(status_code=413, detail="Chunk too large")
cache_manager = await get_cache_manager()
# Для первого чанка (chunk_start = 0 и нет upload_id)
if chunk_start == 0 and not upload_id:
# Создаем новую сессию загрузки
upload_id = str(uuid4())
# Создаем запись о загрузке
upload_session = {
"upload_id": upload_id,
"filename": filename,
"content_type": content_type or "application/octet-stream",
"user_id": str(current_user.id) if current_user else "anonymous",
"chunks": {},
"total_size": 0,
"created_at": datetime.utcnow().isoformat(),
"status": "uploading"
}
# Сохраняем в кэше
session_key = f"upload_session:{upload_id}"
await cache_manager.set(session_key, upload_session, ttl=3600) # 1 час
await logger.ainfo(
"New upload session created",
upload_id=upload_id,
filename=filename,
user_id=str(current_user.id) if current_user else "anonymous"
)
# Получаем сессию загрузки
session_key = f"upload_session:{upload_id}"
upload_session = await cache_manager.get(session_key)
if not upload_session:
raise HTTPException(status_code=404, detail="Upload session not found")
# Сохраняем чанк
chunk_key = f"upload_chunk:{upload_id}:{chunk_start}"
chunk_data = {
"data": base64.b64encode(file).decode(),
"start": chunk_start,
"size": len(file),
"uploaded_at": datetime.utcnow().isoformat()
}
await cache_manager.set(chunk_key, chunk_data, ttl=3600)
# Обновляем сессию
upload_session["chunks"][str(chunk_start)] = len(file)
upload_session["total_size"] = chunk_start + len(file)
await cache_manager.set(session_key, upload_session, ttl=3600)
# Если это последний чанк, собираем файл
if is_last_chunk:
try:
# Собираем все чанки
file_content = await _assemble_file_chunks(upload_id, upload_session)
# Создаем запись контента в БД
content_id = await _create_content_record(
filename=filename,
content_type=content_type or "application/octet-stream",
file_size=len(file_content),
user_id=current_user.id if current_user else None
)
# Сохраняем файл (здесь должна быть реальная файловая система)
file_hash = hashlib.sha256(file_content).hexdigest()
# Очищаем временные данные
await _cleanup_upload_session(upload_id, upload_session)
await logger.ainfo(
"File upload completed",
upload_id=upload_id,
content_id=content_id,
filename=filename,
file_size=len(file_content),
user_id=str(current_user.id) if current_user else "anonymous"
)
# Ответ для завершенной загрузки (формат для web2-client)
return {
"content_sha256": file_hash,
"content_id_v1": content_id,
"content_id": content_id,
"content_url": f"/api/v1/content/{content_id}/download",
"upload_id": upload_id,
"status": "completed"
}
except Exception as e:
await logger.aerror(
"Failed to finalize upload",
upload_id=upload_id,
error=str(e)
)
raise HTTPException(status_code=500, detail="Failed to finalize upload")
else:
# Промежуточный ответ для продолжения загрузки
current_size = upload_session["total_size"]
await logger.adebug(
"Chunk uploaded",
upload_id=upload_id,
chunk_start=chunk_start,
chunk_size=len(file),
current_size=current_size
)
return {
"upload_id": upload_id,
"current_size": current_size,
"chunk_uploaded": True,
"chunks_received": len(upload_session["chunks"])
}
except HTTPException:
raise
except Exception as e:
await logger.aerror(
"Chunked upload failed",
filename=x_file_name,
chunk_start=x_chunk_start,
error=str(e)
)
raise HTTPException(status_code=500, detail="Upload failed")
@router.get("/upload/{upload_id}/status")
async def get_upload_status(
upload_id: str,
request: Request,
current_user: User = Depends(get_current_user)
):
"""
Получение статуса загрузки
"""
try:
# Проверка авторизации
if not current_user:
auth_token = request.headers.get('authorization')
if not auth_token:
raise HTTPException(status_code=401, detail="Authentication required")
cache_manager = await get_cache_manager()
session_key = f"upload_session:{upload_id}"
upload_session = await cache_manager.get(session_key)
if not upload_session:
raise HTTPException(status_code=404, detail="Upload session not found")
# Проверка прав доступа
if current_user and upload_session.get("user_id") != str(current_user.id):
raise HTTPException(status_code=403, detail="Access denied")
# Подсчет прогресса
total_chunks = len(upload_session["chunks"])
total_size = upload_session["total_size"]
return {
"upload_id": upload_id,
"status": upload_session["status"],
"filename": upload_session["filename"],
"total_size": total_size,
"chunks_uploaded": total_chunks,
"created_at": upload_session["created_at"]
}
except HTTPException:
raise
except Exception as e:
await logger.aerror(
"Failed to get upload status",
upload_id=upload_id,
error=str(e)
)
raise HTTPException(status_code=500, detail="Failed to get upload status")
@router.delete("/upload/{upload_id}")
async def cancel_upload(
upload_id: str,
request: Request,
current_user: User = Depends(get_current_user)
):
"""
Отмена загрузки и очистка временных данных
"""
try:
# Проверка авторизации
if not current_user:
auth_token = request.headers.get('authorization')
if not auth_token:
raise HTTPException(status_code=401, detail="Authentication required")
cache_manager = await get_cache_manager()
session_key = f"upload_session:{upload_id}"
upload_session = await cache_manager.get(session_key)
if not upload_session:
raise HTTPException(status_code=404, detail="Upload session not found")
# Проверка прав доступа
if current_user and upload_session.get("user_id") != str(current_user.id):
raise HTTPException(status_code=403, detail="Access denied")
# Очистка всех данных загрузки
await _cleanup_upload_session(upload_id, upload_session)
await logger.ainfo(
"Upload cancelled",
upload_id=upload_id,
user_id=str(current_user.id) if current_user else "anonymous"
)
return {
"message": "Upload cancelled successfully",
"upload_id": upload_id
}
except HTTPException:
raise
except Exception as e:
await logger.aerror(
"Failed to cancel upload",
upload_id=upload_id,
error=str(e)
)
raise HTTPException(status_code=500, detail="Failed to cancel upload")
# Helper functions
async def _assemble_file_chunks(upload_id: str, upload_session: Dict[str, Any]) -> bytes:
"""Сборка файла из чанков"""
cache_manager = await get_cache_manager()
# Сортируем чанки по позиции
chunk_positions = sorted([int(pos) for pos in upload_session["chunks"].keys()])
file_content = b""
for position in chunk_positions:
chunk_key = f"upload_chunk:{upload_id}:{position}"
chunk_data = await cache_manager.get(chunk_key)
if not chunk_data:
raise Exception(f"Missing chunk at position {position}")
# Декодируем chunk data
chunk_bytes = base64.b64decode(chunk_data["data"])
# Проверяем последовательность
if position != len(file_content):
raise Exception(f"Chunk position mismatch: expected {len(file_content)}, got {position}")
file_content += chunk_bytes
return file_content
async def _create_content_record(
filename: str,
content_type: str,
file_size: int,
user_id: Optional[UUID]
) -> str:
"""Создание записи контента в базе данных"""
try:
async with db_manager.get_session() as session:
content = Content(
id=uuid4(),
user_id=user_id,
title=filename,
description=f"Uploaded file: {filename}",
content_type=content_type,
file_size=file_size,
status="completed",
visibility="private"
)
session.add(content)
await session.commit()
await session.refresh(content)
return str(content.id)
except Exception as e:
await logger.aerror(
"Failed to create content record",
filename=filename,
error=str(e)
)
raise
async def _cleanup_upload_session(upload_id: str, upload_session: Dict[str, Any]) -> None:
"""Очистка временных данных загрузки"""
try:
cache_manager = await get_cache_manager()
# Удаляем все чанки
for position in upload_session["chunks"].keys():
chunk_key = f"upload_chunk:{upload_id}:{position}"
await cache_manager.delete(chunk_key)
# Удаляем сессию
session_key = f"upload_session:{upload_id}"
await cache_manager.delete(session_key)
await logger.adebug(
"Upload session cleaned up",
upload_id=upload_id,
chunks_deleted=len(upload_session["chunks"])
)
except Exception as e:
await logger.awarning(
"Failed to cleanup upload session",
upload_id=upload_id,
error=str(e)
)
# Дополнительные эндпоинты для совместимости
@router.post("/api/v1/storage/upload")
async def initiate_upload_v1(
request: Request,
current_user: User = Depends(require_auth)
):
"""
Инициация загрузки (v1 API совместимость)
"""
try:
# Простая заглушка для совместимости
upload_id = str(uuid4())
return {
"upload_id": upload_id,
"status": "ready",
"message": "Upload session created"
}
except Exception as e:
await logger.aerror(
"Failed to initiate upload",
error=str(e)
)
raise HTTPException(status_code=500, detail="Failed to initiate upload")
@router.get("/api/v1/storage/quota")
async def get_storage_quota(
request: Request,
current_user: User = Depends(require_auth)
):
"""
Получение информации о квоте хранилища
"""
try:
# Базовая реализация квот
max_storage = getattr(settings, 'MAX_STORAGE_PER_USER', 1024 * 1024 * 1024) # 1GB
# Подсчет используемого места (заглушка)
used_storage = 0
async with db_manager.get_session() as session:
stmt = select(Content).where(Content.user_id == current_user.id)
result = await session.execute(stmt)
contents = result.scalars().all()
used_storage = sum(content.file_size or 0 for content in contents)
return {
"quota": {
"used_bytes": used_storage,
"max_bytes": max_storage,
"available_bytes": max(0, max_storage - used_storage),
"usage_percent": round((used_storage / max_storage) * 100, 2) if max_storage > 0 else 0
},
"files": {
"count": len(contents),
"max_files": getattr(settings, 'MAX_FILES_PER_USER', 1000)
}
}
except Exception as e:
await logger.aerror(
"Failed to get storage quota",
user_id=str(current_user.id),
error=str(e)
)
raise HTTPException(status_code=500, detail="Failed to get quota information")