"""Compatible user models for MariaDB.""" from datetime import datetime from typing import Optional, List, Dict, Any from sqlalchemy import Column, String, Boolean, Text, Integer, DateTime, Index from sqlalchemy.orm import relationship from app.core.models.base_compatible import BaseModel class User(BaseModel): """User model compatible with existing MariaDB schema.""" __tablename__ = "users" # Basic user information username = Column(String(50), unique=True, nullable=False, index=True) email = Column(String(100), unique=True, nullable=True, index=True) password_hash = Column(String(255), nullable=False) # User status and flags is_active = Column(Boolean, default=True, nullable=False) is_verified = Column(Boolean, default=False, nullable=False) is_admin = Column(Boolean, default=False, nullable=False) # Profile information first_name = Column(String(50), nullable=True) last_name = Column(String(50), nullable=True) bio = Column(Text, nullable=True) avatar_url = Column(String(255), nullable=True) # System tracking last_login = Column(DateTime, nullable=True) login_count = Column(Integer, default=0, nullable=False) # Storage and limits storage_used = Column(Integer, default=0, nullable=False) # bytes storage_limit = Column(Integer, default=100*1024*1024, nullable=False) # 100MB default # TON Blockchain integration ton_wallet_address = Column(String(100), nullable=True, index=True) ton_balance = Column(Integer, default=0, nullable=False) # nanotons # License and subscription license_key = Column(String(100), nullable=True, index=True) license_expires = Column(DateTime, nullable=True) subscription_level = Column(String(20), default="free", nullable=False) # API access api_key = Column(String(100), nullable=True, unique=True, index=True) api_calls_count = Column(Integer, default=0, nullable=False) api_calls_limit = Column(Integer, default=1000, nullable=False) # Relationships will be defined when we create content models # Table indexes for performance __table_args__ = ( Index('idx_users_username_active', 'username', 'is_active'), Index('idx_users_email_verified', 'email', 'is_verified'), Index('idx_users_ton_wallet', 'ton_wallet_address'), Index('idx_users_license', 'license_key', 'license_expires'), ) def check_storage_limit(self, file_size: int) -> bool: """Check if user can upload file of given size.""" return (self.storage_used + file_size) <= self.storage_limit def update_storage_usage(self, size_change: int) -> None: """Update user's storage usage.""" self.storage_used = max(0, self.storage_used + size_change) def is_license_valid(self) -> bool: """Check if user's license is valid.""" if not self.license_key or not self.license_expires: return False return self.license_expires > datetime.utcnow() def can_make_api_call(self) -> bool: """Check if user can make API call.""" return self.api_calls_count < self.api_calls_limit def increment_api_calls(self) -> None: """Increment API calls counter.""" self.api_calls_count += 1 def reset_api_calls(self) -> None: """Reset API calls counter (for monthly reset).""" self.api_calls_count = 0 def get_storage_usage_percent(self) -> float: """Get storage usage as percentage.""" if self.storage_limit == 0: return 0.0 return (self.storage_used / self.storage_limit) * 100 def get_api_usage_percent(self) -> float: """Get API usage as percentage.""" if self.api_calls_limit == 0: return 0.0 return (self.api_calls_count / self.api_calls_limit) * 100 def get_display_name(self) -> str: """Get user's display name.""" if self.first_name and self.last_name: return f"{self.first_name} {self.last_name}" elif self.first_name: return self.first_name return self.username def to_dict(self, include_sensitive: bool = False) -> Dict[str, Any]: """Convert to dictionary with option to exclude sensitive data.""" exclude = set() if not include_sensitive: exclude.update({"password_hash", "api_key", "license_key"}) data = super().to_dict(exclude=exclude) # Add computed fields data.update({ "display_name": self.get_display_name(), "storage_usage_percent": self.get_storage_usage_percent(), "api_usage_percent": self.get_api_usage_percent(), "license_valid": self.is_license_valid(), }) return data def to_public_dict(self) -> Dict[str, Any]: """Convert to public dictionary (minimal user info).""" return { "id": self.id, "username": self.username, "display_name": self.get_display_name(), "avatar_url": self.avatar_url, "is_verified": self.is_verified, "subscription_level": self.subscription_level, "created_at": self.created_at.isoformat() if self.created_at else None, } class UserSession(BaseModel): """User session model for tracking active sessions.""" __tablename__ = "user_sessions" user_id = Column(Integer, nullable=False, index=True) session_token = Column(String(255), unique=True, nullable=False, index=True) refresh_token = Column(String(255), unique=True, nullable=True, index=True) # Session metadata ip_address = Column(String(45), nullable=True) # IPv6 support user_agent = Column(Text, nullable=True) device_info = Column(Text, nullable=True) # Session status is_active = Column(Boolean, default=True, nullable=False) expires_at = Column(DateTime, nullable=False) last_activity = Column(DateTime, default=datetime.utcnow, nullable=False) # Security flags is_suspicious = Column(Boolean, default=False, nullable=False) failed_attempts = Column(Integer, default=0, nullable=False) __table_args__ = ( Index('idx_sessions_user_active', 'user_id', 'is_active'), Index('idx_sessions_token', 'session_token'), Index('idx_sessions_expires', 'expires_at'), ) def is_expired(self) -> bool: """Check if session is expired.""" return datetime.utcnow() > self.expires_at def is_valid(self) -> bool: """Check if session is valid.""" return self.is_active and not self.is_expired() def extend_session(self, hours: int = 24) -> None: """Extend session expiration.""" from datetime import timedelta self.expires_at = datetime.utcnow() + timedelta(hours=hours) self.last_activity = datetime.utcnow() def mark_suspicious(self) -> None: """Mark session as suspicious.""" self.is_suspicious = True self.failed_attempts += 1 def deactivate(self) -> None: """Deactivate session.""" self.is_active = False class UserPreferences(BaseModel): """User preferences and settings.""" __tablename__ = "user_preferences" user_id = Column(Integer, unique=True, nullable=False, index=True) # UI preferences theme = Column(String(20), default="light", nullable=False) language = Column(String(10), default="en", nullable=False) timezone = Column(String(50), default="UTC", nullable=False) # Notification preferences email_notifications = Column(Boolean, default=True, nullable=False) upload_notifications = Column(Boolean, default=True, nullable=False) storage_alerts = Column(Boolean, default=True, nullable=False) # Privacy settings public_profile = Column(Boolean, default=False, nullable=False) show_email = Column(Boolean, default=False, nullable=False) allow_indexing = Column(Boolean, default=True, nullable=False) # Upload preferences auto_optimize_images = Column(Boolean, default=True, nullable=False) default_privacy = Column(String(20), default="private", nullable=False) max_file_size_mb = Column(Integer, default=10, nullable=False) # Cache and performance cache_thumbnails = Column(Boolean, default=True, nullable=False) preload_content = Column(Boolean, default=False, nullable=False) def to_dict(self) -> Dict[str, Any]: """Convert preferences to dictionary.""" return super().to_dict(exclude={"user_id"}) @classmethod def get_default_preferences(cls) -> Dict[str, Any]: """Get default user preferences.""" return { "theme": "light", "language": "en", "timezone": "UTC", "email_notifications": True, "upload_notifications": True, "storage_alerts": True, "public_profile": False, "show_email": False, "allow_indexing": True, "auto_optimize_images": True, "default_privacy": "private", "max_file_size_mb": 10, "cache_thumbnails": True, "preload_content": False, }