uploader-bot/app/core/models/user_compatible.py

247 lines
9.2 KiB
Python

"""Compatible user models for MariaDB."""
from datetime import datetime
from typing import Optional, List, Dict, Any
from sqlalchemy import Column, String, Boolean, Text, Integer, DateTime, Index
from sqlalchemy.orm import relationship
from app.core.models.base_compatible import BaseModel
class User(BaseModel):
"""User model compatible with existing MariaDB schema."""
__tablename__ = "users"
# Basic user information
username = Column(String(50), unique=True, nullable=False, index=True)
email = Column(String(100), unique=True, nullable=True, index=True)
password_hash = Column(String(255), nullable=False)
# User status and flags
is_active = Column(Boolean, default=True, nullable=False)
is_verified = Column(Boolean, default=False, nullable=False)
is_admin = Column(Boolean, default=False, nullable=False)
# Profile information
first_name = Column(String(50), nullable=True)
last_name = Column(String(50), nullable=True)
bio = Column(Text, nullable=True)
avatar_url = Column(String(255), nullable=True)
# System tracking
last_login = Column(DateTime, nullable=True)
login_count = Column(Integer, default=0, nullable=False)
# Storage and limits
storage_used = Column(Integer, default=0, nullable=False) # bytes
storage_limit = Column(Integer, default=100*1024*1024, nullable=False) # 100MB default
# TON Blockchain integration
ton_wallet_address = Column(String(100), nullable=True, index=True)
ton_balance = Column(Integer, default=0, nullable=False) # nanotons
# License and subscription
license_key = Column(String(100), nullable=True, index=True)
license_expires = Column(DateTime, nullable=True)
subscription_level = Column(String(20), default="free", nullable=False)
# API access
api_key = Column(String(100), nullable=True, unique=True, index=True)
api_calls_count = Column(Integer, default=0, nullable=False)
api_calls_limit = Column(Integer, default=1000, nullable=False)
# Relationships will be defined when we create content models
# Table indexes for performance
__table_args__ = (
Index('idx_users_username_active', 'username', 'is_active'),
Index('idx_users_email_verified', 'email', 'is_verified'),
Index('idx_users_ton_wallet', 'ton_wallet_address'),
Index('idx_users_license', 'license_key', 'license_expires'),
)
def check_storage_limit(self, file_size: int) -> bool:
"""Check if user can upload file of given size."""
return (self.storage_used + file_size) <= self.storage_limit
def update_storage_usage(self, size_change: int) -> None:
"""Update user's storage usage."""
self.storage_used = max(0, self.storage_used + size_change)
def is_license_valid(self) -> bool:
"""Check if user's license is valid."""
if not self.license_key or not self.license_expires:
return False
return self.license_expires > datetime.utcnow()
def can_make_api_call(self) -> bool:
"""Check if user can make API call."""
return self.api_calls_count < self.api_calls_limit
def increment_api_calls(self) -> None:
"""Increment API calls counter."""
self.api_calls_count += 1
def reset_api_calls(self) -> None:
"""Reset API calls counter (for monthly reset)."""
self.api_calls_count = 0
def get_storage_usage_percent(self) -> float:
"""Get storage usage as percentage."""
if self.storage_limit == 0:
return 0.0
return (self.storage_used / self.storage_limit) * 100
def get_api_usage_percent(self) -> float:
"""Get API usage as percentage."""
if self.api_calls_limit == 0:
return 0.0
return (self.api_calls_count / self.api_calls_limit) * 100
def get_display_name(self) -> str:
"""Get user's display name."""
if self.first_name and self.last_name:
return f"{self.first_name} {self.last_name}"
elif self.first_name:
return self.first_name
return self.username
def to_dict(self, include_sensitive: bool = False) -> Dict[str, Any]:
"""Convert to dictionary with option to exclude sensitive data."""
exclude = set()
if not include_sensitive:
exclude.update({"password_hash", "api_key", "license_key"})
data = super().to_dict(exclude=exclude)
# Add computed fields
data.update({
"display_name": self.get_display_name(),
"storage_usage_percent": self.get_storage_usage_percent(),
"api_usage_percent": self.get_api_usage_percent(),
"license_valid": self.is_license_valid(),
})
return data
def to_public_dict(self) -> Dict[str, Any]:
"""Convert to public dictionary (minimal user info)."""
return {
"id": self.id,
"username": self.username,
"display_name": self.get_display_name(),
"avatar_url": self.avatar_url,
"is_verified": self.is_verified,
"subscription_level": self.subscription_level,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
class UserSession(BaseModel):
"""User session model for tracking active sessions."""
__tablename__ = "user_sessions"
user_id = Column(Integer, nullable=False, index=True)
session_token = Column(String(255), unique=True, nullable=False, index=True)
refresh_token = Column(String(255), unique=True, nullable=True, index=True)
# Session metadata
ip_address = Column(String(45), nullable=True) # IPv6 support
user_agent = Column(Text, nullable=True)
device_info = Column(Text, nullable=True)
# Session status
is_active = Column(Boolean, default=True, nullable=False)
expires_at = Column(DateTime, nullable=False)
last_activity = Column(DateTime, default=datetime.utcnow, nullable=False)
# Security flags
is_suspicious = Column(Boolean, default=False, nullable=False)
failed_attempts = Column(Integer, default=0, nullable=False)
__table_args__ = (
Index('idx_sessions_user_active', 'user_id', 'is_active'),
Index('idx_sessions_token', 'session_token'),
Index('idx_sessions_expires', 'expires_at'),
)
def is_expired(self) -> bool:
"""Check if session is expired."""
return datetime.utcnow() > self.expires_at
def is_valid(self) -> bool:
"""Check if session is valid."""
return self.is_active and not self.is_expired()
def extend_session(self, hours: int = 24) -> None:
"""Extend session expiration."""
from datetime import timedelta
self.expires_at = datetime.utcnow() + timedelta(hours=hours)
self.last_activity = datetime.utcnow()
def mark_suspicious(self) -> None:
"""Mark session as suspicious."""
self.is_suspicious = True
self.failed_attempts += 1
def deactivate(self) -> None:
"""Deactivate session."""
self.is_active = False
class UserPreferences(BaseModel):
"""User preferences and settings."""
__tablename__ = "user_preferences"
user_id = Column(Integer, unique=True, nullable=False, index=True)
# UI preferences
theme = Column(String(20), default="light", nullable=False)
language = Column(String(10), default="en", nullable=False)
timezone = Column(String(50), default="UTC", nullable=False)
# Notification preferences
email_notifications = Column(Boolean, default=True, nullable=False)
upload_notifications = Column(Boolean, default=True, nullable=False)
storage_alerts = Column(Boolean, default=True, nullable=False)
# Privacy settings
public_profile = Column(Boolean, default=False, nullable=False)
show_email = Column(Boolean, default=False, nullable=False)
allow_indexing = Column(Boolean, default=True, nullable=False)
# Upload preferences
auto_optimize_images = Column(Boolean, default=True, nullable=False)
default_privacy = Column(String(20), default="private", nullable=False)
max_file_size_mb = Column(Integer, default=10, nullable=False)
# Cache and performance
cache_thumbnails = Column(Boolean, default=True, nullable=False)
preload_content = Column(Boolean, default=False, nullable=False)
def to_dict(self) -> Dict[str, Any]:
"""Convert preferences to dictionary."""
return super().to_dict(exclude={"user_id"})
@classmethod
def get_default_preferences(cls) -> Dict[str, Any]:
"""Get default user preferences."""
return {
"theme": "light",
"language": "en",
"timezone": "UTC",
"email_notifications": True,
"upload_notifications": True,
"storage_alerts": True,
"public_profile": False,
"show_email": False,
"allow_indexing": True,
"auto_optimize_images": True,
"default_privacy": "private",
"max_file_size_mb": 10,
"cache_thumbnails": True,
"preload_content": False,
}