""" Comprehensive validation schemas using Pydantic for request/response validation. Provides type safety, data validation, and automatic documentation generation. """ from datetime import datetime from typing import Dict, List, Optional, Any, Union from uuid import UUID from enum import Enum from pydantic import BaseModel, Field, validator, model_validator from pydantic.networks import EmailStr, HttpUrl class ContentTypeEnum(str, Enum): """Supported content types.""" AUDIO = "audio" VIDEO = "video" IMAGE = "image" DOCUMENT = "document" ARCHIVE = "archive" OTHER = "other" class VisibilityEnum(str, Enum): """Content visibility levels.""" PUBLIC = "public" PRIVATE = "private" UNLISTED = "unlisted" RESTRICTED = "restricted" class StatusEnum(str, Enum): """Content processing status.""" PENDING = "pending" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" DELETED = "deleted" class PermissionEnum(str, Enum): """User permissions.""" READ = "read" WRITE = "write" DELETE = "delete" ADMIN = "admin" class BaseSchema(BaseModel): """Base schema with common configuration.""" model_config = { "use_enum_values": True, "validate_assignment": True, "populate_by_name": True, "json_encoders": { datetime: lambda v: v.isoformat(), UUID: lambda v: str(v) } } class ContentSchema(BaseSchema): """Schema for content creation.""" title: str = Field(..., min_length=1, max_length=255, description="Content title") description: Optional[str] = Field(None, max_length=2000, description="Content description") content_type: ContentTypeEnum = Field(..., description="Type of content") file_size: Optional[int] = Field(None, ge=0, le=10737418240, description="File size in bytes (max 10GB)") visibility: VisibilityEnum = Field(VisibilityEnum.PRIVATE, description="Content visibility") tags: List[str] = Field(default_factory=list, max_items=20, description="Content tags") license_id: Optional[UUID] = Field(None, description="License ID if applicable") metadata: Optional[Dict[str, Any]] = Field(None, description="Additional metadata") @validator('tags') def validate_tags(cls, v): """Validate tags format and content.""" if not v: return v # Check each tag for tag in v: if not isinstance(tag, str): raise ValueError("Tags must be strings") if len(tag) < 1 or len(tag) > 50: raise ValueError("Tag length must be between 1 and 50 characters") if not tag.replace('-', '').replace('_', '').isalnum(): raise ValueError("Tags can only contain alphanumeric characters, hyphens, and underscores") # Remove duplicates while preserving order seen = set() unique_tags = [] for tag in v: tag_lower = tag.lower() if tag_lower not in seen: seen.add(tag_lower) unique_tags.append(tag) return unique_tags @validator('metadata') def validate_metadata(cls, v): """Validate metadata structure.""" if not v: return v # Check metadata size (JSON serialized) import json try: serialized = json.dumps(v) if len(serialized) > 10000: # Max 10KB of metadata raise ValueError("Metadata too large (max 10KB)") except (TypeError, ValueError) as e: raise ValueError(f"Invalid metadata format: {e}") return v class ContentUpdateSchema(BaseSchema): """Schema for content updates.""" title: Optional[str] = Field(None, min_length=1, max_length=255) description: Optional[str] = Field(None, max_length=2000) visibility: Optional[VisibilityEnum] = None tags: Optional[List[str]] = Field(None, max_items=20) license_id: Optional[UUID] = None status: Optional[StatusEnum] = None @validator('tags') def validate_tags(cls, v): """Validate tags if provided.""" if v is None: return v return ContentSchema.validate_tags(v) class ContentSearchSchema(BaseSchema): """Schema for content search requests.""" query: Optional[str] = Field(None, min_length=1, max_length=200, description="Search query") content_type: Optional[ContentTypeEnum] = None status: Optional[StatusEnum] = None tags: Optional[List[str]] = Field(None, max_items=10) visibility: Optional[VisibilityEnum] = None date_from: Optional[datetime] = None date_to: Optional[datetime] = None sort_by: Optional[str] = Field("updated_at", pattern="^(created_at|updated_at|title|file_size)$") sort_order: Optional[str] = Field("desc", pattern="^(asc|desc)$") page: int = Field(1, ge=1, le=1000) per_page: int = Field(20, ge=1, le=100) @model_validator(mode='before') def validate_date_range(cls, values): """Validate date range.""" date_from = values.get('date_from') date_to = values.get('date_to') if date_from and date_to and date_from >= date_to: raise ValueError("date_from must be before date_to") return values class UserRegistrationSchema(BaseSchema): """Schema for user registration.""" username: str = Field(..., min_length=3, max_length=50, pattern="^[a-zA-Z0-9_.-]+$") email: EmailStr = Field(..., description="Valid email address") password: str = Field(..., min_length=8, max_length=128, description="Password (min 8 characters)") full_name: Optional[str] = Field(None, max_length=100) @validator('password') def validate_password(cls, v): """Validate password strength.""" if len(v) < 8: raise ValueError("Password must be at least 8 characters long") # Check for required character types has_upper = any(c.isupper() for c in v) has_lower = any(c.islower() for c in v) has_digit = any(c.isdigit() for c in v) has_special = any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in v) if not (has_upper and has_lower and has_digit and has_special): raise ValueError( "Password must contain at least one uppercase letter, " "one lowercase letter, one digit, and one special character" ) return v class UserLoginSchema(BaseSchema): """Schema for user login.""" username: str = Field(..., min_length=1, max_length=50) password: str = Field(..., min_length=1, max_length=128) remember_me: bool = Field(False, description="Keep session longer") class UserUpdateSchema(BaseSchema): """Schema for user profile updates.""" full_name: Optional[str] = Field(None, max_length=100) email: Optional[EmailStr] = None bio: Optional[str] = Field(None, max_length=500) avatar_url: Optional[HttpUrl] = None settings: Optional[Dict[str, Any]] = None @validator('settings') def validate_settings(cls, v): """Validate user settings.""" if not v: return v # Allowed settings keys allowed_keys = { 'notifications', 'privacy', 'theme', 'language', 'timezone', 'auto_save', 'quality_preference' } for key in v.keys(): if key not in allowed_keys: raise ValueError(f"Invalid settings key: {key}") return v class StorageUploadSchema(BaseSchema): """Schema for file upload initiation.""" filename: str = Field(..., min_length=1, max_length=255) file_size: int = Field(..., ge=1, le=10737418240) # Max 10GB content_type: str = Field(..., min_length=1, max_length=100) chunk_size: Optional[int] = Field(1048576, ge=65536, le=10485760) # 64KB to 10MB @validator('filename') def validate_filename(cls, v): """Validate filename format.""" import re # Check for dangerous characters if re.search(r'[<>:"/\\|?*\x00-\x1f]', v): raise ValueError("Filename contains invalid characters") # Check for reserved names (Windows) reserved_names = { 'CON', 'PRN', 'AUX', 'NUL', 'COM1', 'COM2', 'COM3', 'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9', 'LPT1', 'LPT2', 'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9' } name_part = v.split('.')[0].upper() if name_part in reserved_names: raise ValueError("Filename uses reserved name") return v class ChunkUploadSchema(BaseSchema): """Schema for chunk upload.""" upload_id: UUID = Field(..., description="Upload session ID") chunk_index: int = Field(..., ge=0, description="Chunk sequence number") chunk_hash: str = Field(..., min_length=64, max_length=64, description="SHA256 hash of chunk") is_final: bool = Field(False, description="Is this the final chunk") class BlockchainTransactionSchema(BaseSchema): """Schema for blockchain transactions.""" transaction_type: str = Field(..., pattern="^(transfer|mint|burn|stake|unstake)$") amount: Optional[int] = Field(None, ge=0, description="Amount in nanotons") recipient_address: Optional[str] = Field(None, min_length=48, max_length=48) message: Optional[str] = Field(None, max_length=500) @validator('recipient_address') def validate_ton_address(cls, v): """Validate TON address format.""" if not v: return v # Basic TON address validation import re if not re.match(r'^[a-zA-Z0-9_-]{48}$', v): raise ValueError("Invalid TON address format") return v class LicenseSchema(BaseSchema): """Schema for license information.""" name: str = Field(..., min_length=1, max_length=100) description: Optional[str] = Field(None, max_length=1000) url: Optional[HttpUrl] = None commercial_use: bool = Field(False, description="Allows commercial use") attribution_required: bool = Field(True, description="Requires attribution") share_alike: bool = Field(False, description="Requires share-alike") class AccessControlSchema(BaseSchema): """Schema for content access control.""" user_id: UUID = Field(..., description="User to grant access to") permission: str = Field(..., pattern="^(read|write|delete|admin)$") expires_at: Optional[datetime] = Field(None, description="Access expiration time") @model_validator(mode='before') def validate_expiration(cls, values): """Validate access expiration.""" expires_at = values.get('expires_at') if expires_at and expires_at <= datetime.utcnow(): raise ValueError("Expiration time must be in the future") return values class ApiKeySchema(BaseSchema): """Schema for API key creation.""" name: str = Field(..., min_length=1, max_length=100, description="API key name") permissions: List[str] = Field(..., min_items=1, description="List of permissions") expires_at: Optional[datetime] = Field(None, description="Key expiration time") @validator('permissions') def validate_permissions(cls, v): """Validate permission format.""" valid_permissions = { 'content.read', 'content.create', 'content.update', 'content.delete', 'storage.upload', 'storage.download', 'storage.delete', 'user.read', 'user.update', 'admin.read', 'admin.write' } for perm in v: if perm not in valid_permissions: raise ValueError(f"Invalid permission: {perm}") return list(set(v)) # Remove duplicates class WebhookSchema(BaseSchema): """Schema for webhook configuration.""" url: HttpUrl = Field(..., description="Webhook endpoint URL") events: List[str] = Field(..., min_items=1, description="Events to subscribe to") secret: Optional[str] = Field(None, min_length=16, max_length=64, description="Webhook secret") active: bool = Field(True, description="Whether webhook is active") @validator('events') def validate_events(cls, v): """Validate webhook events.""" valid_events = { 'content.created', 'content.updated', 'content.deleted', 'user.registered', 'user.updated', 'upload.completed', 'blockchain.transaction', 'system.error' } for event in v: if event not in valid_events: raise ValueError(f"Invalid event: {event}") return list(set(v)) # Response schemas class ContentResponseSchema(BaseSchema): """Schema for content response.""" id: UUID title: str description: Optional[str] content_type: ContentTypeEnum file_size: int status: StatusEnum visibility: VisibilityEnum tags: List[str] created_at: datetime updated_at: datetime user_id: UUID class UserResponseSchema(BaseSchema): """Schema for user response.""" id: UUID username: str email: EmailStr full_name: Optional[str] created_at: datetime is_active: bool permissions: List[str] class ErrorResponseSchema(BaseSchema): """Schema for error responses.""" error: str = Field(..., description="Error message") code: str = Field(..., description="Error code") details: Optional[Dict[str, Any]] = Field(None, description="Additional error details") timestamp: datetime = Field(default_factory=datetime.utcnow) class SuccessResponseSchema(BaseSchema): """Schema for success responses.""" message: str = Field(..., description="Success message") data: Optional[Dict[str, Any]] = Field(None, description="Response data") timestamp: datetime = Field(default_factory=datetime.utcnow)