372 lines
14 KiB
Python
372 lines
14 KiB
Python
"""
|
|
Comprehensive validation schemas using Pydantic for request/response validation.
|
|
Provides type safety, data validation, and automatic documentation generation.
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Any, Union
|
|
from uuid import UUID
|
|
from enum import Enum
|
|
|
|
from pydantic import BaseModel, Field, validator, model_validator
|
|
from pydantic.networks import EmailStr, HttpUrl
|
|
|
|
class ContentTypeEnum(str, Enum):
|
|
"""Supported content types."""
|
|
AUDIO = "audio"
|
|
VIDEO = "video"
|
|
IMAGE = "image"
|
|
DOCUMENT = "document"
|
|
ARCHIVE = "archive"
|
|
OTHER = "other"
|
|
|
|
class VisibilityEnum(str, Enum):
|
|
"""Content visibility levels."""
|
|
PUBLIC = "public"
|
|
PRIVATE = "private"
|
|
UNLISTED = "unlisted"
|
|
RESTRICTED = "restricted"
|
|
|
|
class StatusEnum(str, Enum):
|
|
"""Content processing status."""
|
|
PENDING = "pending"
|
|
PROCESSING = "processing"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
DELETED = "deleted"
|
|
|
|
class PermissionEnum(str, Enum):
|
|
"""User permissions."""
|
|
READ = "read"
|
|
WRITE = "write"
|
|
DELETE = "delete"
|
|
ADMIN = "admin"
|
|
|
|
class BaseSchema(BaseModel):
|
|
"""Base schema with common configuration."""
|
|
|
|
model_config = {
|
|
"use_enum_values": True,
|
|
"validate_assignment": True,
|
|
"populate_by_name": True,
|
|
"json_encoders": {
|
|
datetime: lambda v: v.isoformat(),
|
|
UUID: lambda v: str(v)
|
|
}
|
|
}
|
|
|
|
class ContentSchema(BaseSchema):
|
|
"""Schema for content creation."""
|
|
title: str = Field(..., min_length=1, max_length=255, description="Content title")
|
|
description: Optional[str] = Field(None, max_length=2000, description="Content description")
|
|
content_type: ContentTypeEnum = Field(..., description="Type of content")
|
|
file_size: Optional[int] = Field(None, ge=0, le=10737418240, description="File size in bytes (max 10GB)")
|
|
visibility: VisibilityEnum = Field(VisibilityEnum.PRIVATE, description="Content visibility")
|
|
tags: List[str] = Field(default_factory=list, max_items=20, description="Content tags")
|
|
license_id: Optional[UUID] = Field(None, description="License ID if applicable")
|
|
metadata: Optional[Dict[str, Any]] = Field(None, description="Additional metadata")
|
|
|
|
@validator('tags')
|
|
def validate_tags(cls, v):
|
|
"""Validate tags format and content."""
|
|
if not v:
|
|
return v
|
|
|
|
# Check each tag
|
|
for tag in v:
|
|
if not isinstance(tag, str):
|
|
raise ValueError("Tags must be strings")
|
|
if len(tag) < 1 or len(tag) > 50:
|
|
raise ValueError("Tag length must be between 1 and 50 characters")
|
|
if not tag.replace('-', '').replace('_', '').isalnum():
|
|
raise ValueError("Tags can only contain alphanumeric characters, hyphens, and underscores")
|
|
|
|
# Remove duplicates while preserving order
|
|
seen = set()
|
|
unique_tags = []
|
|
for tag in v:
|
|
tag_lower = tag.lower()
|
|
if tag_lower not in seen:
|
|
seen.add(tag_lower)
|
|
unique_tags.append(tag)
|
|
|
|
return unique_tags
|
|
|
|
@validator('metadata')
|
|
def validate_metadata(cls, v):
|
|
"""Validate metadata structure."""
|
|
if not v:
|
|
return v
|
|
|
|
# Check metadata size (JSON serialized)
|
|
import json
|
|
try:
|
|
serialized = json.dumps(v)
|
|
if len(serialized) > 10000: # Max 10KB of metadata
|
|
raise ValueError("Metadata too large (max 10KB)")
|
|
except (TypeError, ValueError) as e:
|
|
raise ValueError(f"Invalid metadata format: {e}")
|
|
|
|
return v
|
|
|
|
class ContentUpdateSchema(BaseSchema):
|
|
"""Schema for content updates."""
|
|
title: Optional[str] = Field(None, min_length=1, max_length=255)
|
|
description: Optional[str] = Field(None, max_length=2000)
|
|
visibility: Optional[VisibilityEnum] = None
|
|
tags: Optional[List[str]] = Field(None, max_items=20)
|
|
license_id: Optional[UUID] = None
|
|
status: Optional[StatusEnum] = None
|
|
|
|
@validator('tags')
|
|
def validate_tags(cls, v):
|
|
"""Validate tags if provided."""
|
|
if v is None:
|
|
return v
|
|
return ContentSchema.validate_tags(v)
|
|
|
|
class ContentSearchSchema(BaseSchema):
|
|
"""Schema for content search requests."""
|
|
query: Optional[str] = Field(None, min_length=1, max_length=200, description="Search query")
|
|
content_type: Optional[ContentTypeEnum] = None
|
|
status: Optional[StatusEnum] = None
|
|
tags: Optional[List[str]] = Field(None, max_items=10)
|
|
visibility: Optional[VisibilityEnum] = None
|
|
date_from: Optional[datetime] = None
|
|
date_to: Optional[datetime] = None
|
|
sort_by: Optional[str] = Field("updated_at", pattern="^(created_at|updated_at|title|file_size)$")
|
|
sort_order: Optional[str] = Field("desc", pattern="^(asc|desc)$")
|
|
page: int = Field(1, ge=1, le=1000)
|
|
per_page: int = Field(20, ge=1, le=100)
|
|
|
|
@model_validator(mode='before')
|
|
def validate_date_range(cls, values):
|
|
"""Validate date range."""
|
|
date_from = values.get('date_from')
|
|
date_to = values.get('date_to')
|
|
|
|
if date_from and date_to and date_from >= date_to:
|
|
raise ValueError("date_from must be before date_to")
|
|
|
|
return values
|
|
|
|
class UserRegistrationSchema(BaseSchema):
|
|
"""Schema for user registration."""
|
|
username: str = Field(..., min_length=3, max_length=50, pattern="^[a-zA-Z0-9_.-]+$")
|
|
email: EmailStr = Field(..., description="Valid email address")
|
|
password: str = Field(..., min_length=8, max_length=128, description="Password (min 8 characters)")
|
|
full_name: Optional[str] = Field(None, max_length=100)
|
|
|
|
@validator('password')
|
|
def validate_password(cls, v):
|
|
"""Validate password strength."""
|
|
if len(v) < 8:
|
|
raise ValueError("Password must be at least 8 characters long")
|
|
|
|
# Check for required character types
|
|
has_upper = any(c.isupper() for c in v)
|
|
has_lower = any(c.islower() for c in v)
|
|
has_digit = any(c.isdigit() for c in v)
|
|
has_special = any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in v)
|
|
|
|
if not (has_upper and has_lower and has_digit and has_special):
|
|
raise ValueError(
|
|
"Password must contain at least one uppercase letter, "
|
|
"one lowercase letter, one digit, and one special character"
|
|
)
|
|
|
|
return v
|
|
|
|
class UserLoginSchema(BaseSchema):
|
|
"""Schema for user login."""
|
|
username: str = Field(..., min_length=1, max_length=50)
|
|
password: str = Field(..., min_length=1, max_length=128)
|
|
remember_me: bool = Field(False, description="Keep session longer")
|
|
|
|
class UserUpdateSchema(BaseSchema):
|
|
"""Schema for user profile updates."""
|
|
full_name: Optional[str] = Field(None, max_length=100)
|
|
email: Optional[EmailStr] = None
|
|
bio: Optional[str] = Field(None, max_length=500)
|
|
avatar_url: Optional[HttpUrl] = None
|
|
settings: Optional[Dict[str, Any]] = None
|
|
|
|
@validator('settings')
|
|
def validate_settings(cls, v):
|
|
"""Validate user settings."""
|
|
if not v:
|
|
return v
|
|
|
|
# Allowed settings keys
|
|
allowed_keys = {
|
|
'notifications', 'privacy', 'theme', 'language',
|
|
'timezone', 'auto_save', 'quality_preference'
|
|
}
|
|
|
|
for key in v.keys():
|
|
if key not in allowed_keys:
|
|
raise ValueError(f"Invalid settings key: {key}")
|
|
|
|
return v
|
|
|
|
class StorageUploadSchema(BaseSchema):
|
|
"""Schema for file upload initiation."""
|
|
filename: str = Field(..., min_length=1, max_length=255)
|
|
file_size: int = Field(..., ge=1, le=10737418240) # Max 10GB
|
|
content_type: str = Field(..., min_length=1, max_length=100)
|
|
chunk_size: Optional[int] = Field(1048576, ge=65536, le=10485760) # 64KB to 10MB
|
|
|
|
@validator('filename')
|
|
def validate_filename(cls, v):
|
|
"""Validate filename format."""
|
|
import re
|
|
|
|
# Check for dangerous characters
|
|
if re.search(r'[<>:"/\\|?*\x00-\x1f]', v):
|
|
raise ValueError("Filename contains invalid characters")
|
|
|
|
# Check for reserved names (Windows)
|
|
reserved_names = {
|
|
'CON', 'PRN', 'AUX', 'NUL',
|
|
'COM1', 'COM2', 'COM3', 'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9',
|
|
'LPT1', 'LPT2', 'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9'
|
|
}
|
|
|
|
name_part = v.split('.')[0].upper()
|
|
if name_part in reserved_names:
|
|
raise ValueError("Filename uses reserved name")
|
|
|
|
return v
|
|
|
|
class ChunkUploadSchema(BaseSchema):
|
|
"""Schema for chunk upload."""
|
|
upload_id: UUID = Field(..., description="Upload session ID")
|
|
chunk_index: int = Field(..., ge=0, description="Chunk sequence number")
|
|
chunk_hash: str = Field(..., min_length=64, max_length=64, description="SHA256 hash of chunk")
|
|
is_final: bool = Field(False, description="Is this the final chunk")
|
|
|
|
class BlockchainTransactionSchema(BaseSchema):
|
|
"""Schema for blockchain transactions."""
|
|
transaction_type: str = Field(..., pattern="^(transfer|mint|burn|stake|unstake)$")
|
|
amount: Optional[int] = Field(None, ge=0, description="Amount in nanotons")
|
|
recipient_address: Optional[str] = Field(None, min_length=48, max_length=48)
|
|
message: Optional[str] = Field(None, max_length=500)
|
|
|
|
@validator('recipient_address')
|
|
def validate_ton_address(cls, v):
|
|
"""Validate TON address format."""
|
|
if not v:
|
|
return v
|
|
|
|
# Basic TON address validation
|
|
import re
|
|
if not re.match(r'^[a-zA-Z0-9_-]{48}$', v):
|
|
raise ValueError("Invalid TON address format")
|
|
|
|
return v
|
|
|
|
class LicenseSchema(BaseSchema):
|
|
"""Schema for license information."""
|
|
name: str = Field(..., min_length=1, max_length=100)
|
|
description: Optional[str] = Field(None, max_length=1000)
|
|
url: Optional[HttpUrl] = None
|
|
commercial_use: bool = Field(False, description="Allows commercial use")
|
|
attribution_required: bool = Field(True, description="Requires attribution")
|
|
share_alike: bool = Field(False, description="Requires share-alike")
|
|
|
|
class AccessControlSchema(BaseSchema):
|
|
"""Schema for content access control."""
|
|
user_id: UUID = Field(..., description="User to grant access to")
|
|
permission: str = Field(..., pattern="^(read|write|delete|admin)$")
|
|
expires_at: Optional[datetime] = Field(None, description="Access expiration time")
|
|
|
|
@model_validator(mode='before')
|
|
def validate_expiration(cls, values):
|
|
"""Validate access expiration."""
|
|
expires_at = values.get('expires_at')
|
|
|
|
if expires_at and expires_at <= datetime.utcnow():
|
|
raise ValueError("Expiration time must be in the future")
|
|
|
|
return values
|
|
|
|
class ApiKeySchema(BaseSchema):
|
|
"""Schema for API key creation."""
|
|
name: str = Field(..., min_length=1, max_length=100, description="API key name")
|
|
permissions: List[str] = Field(..., min_items=1, description="List of permissions")
|
|
expires_at: Optional[datetime] = Field(None, description="Key expiration time")
|
|
|
|
@validator('permissions')
|
|
def validate_permissions(cls, v):
|
|
"""Validate permission format."""
|
|
valid_permissions = {
|
|
'content.read', 'content.create', 'content.update', 'content.delete',
|
|
'storage.upload', 'storage.download', 'storage.delete',
|
|
'user.read', 'user.update', 'admin.read', 'admin.write'
|
|
}
|
|
|
|
for perm in v:
|
|
if perm not in valid_permissions:
|
|
raise ValueError(f"Invalid permission: {perm}")
|
|
|
|
return list(set(v)) # Remove duplicates
|
|
|
|
class WebhookSchema(BaseSchema):
|
|
"""Schema for webhook configuration."""
|
|
url: HttpUrl = Field(..., description="Webhook endpoint URL")
|
|
events: List[str] = Field(..., min_items=1, description="Events to subscribe to")
|
|
secret: Optional[str] = Field(None, min_length=16, max_length=64, description="Webhook secret")
|
|
active: bool = Field(True, description="Whether webhook is active")
|
|
|
|
@validator('events')
|
|
def validate_events(cls, v):
|
|
"""Validate webhook events."""
|
|
valid_events = {
|
|
'content.created', 'content.updated', 'content.deleted',
|
|
'user.registered', 'user.updated', 'upload.completed',
|
|
'blockchain.transaction', 'system.error'
|
|
}
|
|
|
|
for event in v:
|
|
if event not in valid_events:
|
|
raise ValueError(f"Invalid event: {event}")
|
|
|
|
return list(set(v))
|
|
|
|
# Response schemas
|
|
class ContentResponseSchema(BaseSchema):
|
|
"""Schema for content response."""
|
|
id: UUID
|
|
title: str
|
|
description: Optional[str]
|
|
content_type: ContentTypeEnum
|
|
file_size: int
|
|
status: StatusEnum
|
|
visibility: VisibilityEnum
|
|
tags: List[str]
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
user_id: UUID
|
|
|
|
class UserResponseSchema(BaseSchema):
|
|
"""Schema for user response."""
|
|
id: UUID
|
|
username: str
|
|
email: EmailStr
|
|
full_name: Optional[str]
|
|
created_at: datetime
|
|
is_active: bool
|
|
permissions: List[str]
|
|
|
|
class ErrorResponseSchema(BaseSchema):
|
|
"""Schema for error responses."""
|
|
error: str = Field(..., description="Error message")
|
|
code: str = Field(..., description="Error code")
|
|
details: Optional[Dict[str, Any]] = Field(None, description="Additional error details")
|
|
timestamp: datetime = Field(default_factory=datetime.utcnow)
|
|
|
|
class SuccessResponseSchema(BaseSchema):
|
|
"""Schema for success responses."""
|
|
message: str = Field(..., description="Success message")
|
|
data: Optional[Dict[str, Any]] = Field(None, description="Response data")
|
|
timestamp: datetime = Field(default_factory=datetime.utcnow) |