uploader-bot/app/core/models/user/user.py

629 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
User model with async support and enhanced security
"""
import hashlib
import secrets
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
from enum import Enum
from sqlalchemy import Column, String, BigInteger, Boolean, Integer, Index, text, DateTime
from sqlalchemy.dialects.postgresql import ARRAY, JSONB
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import relationship
import structlog
from app.core.models.base import BaseModel
from app.core.config import settings
logger = structlog.get_logger(__name__)
class UserRole(str, Enum):
"""User role enumeration"""
USER = "user"
MODERATOR = "moderator"
ADMIN = "admin"
SUPER_ADMIN = "super_admin"
class UserStatus(str, Enum):
"""User status enumeration"""
ACTIVE = "active"
SUSPENDED = "suspended"
BANNED = "banned"
PENDING = "pending"
class User(BaseModel):
"""Enhanced User model with security and async support"""
__tablename__ = 'users'
# Telegram specific fields
telegram_id = Column(
BigInteger,
nullable=False,
unique=True,
index=True,
comment="Telegram user ID"
)
username = Column(
String(512),
nullable=True,
index=True,
comment="Telegram username"
)
first_name = Column(
String(256),
nullable=True,
comment="User first name"
)
last_name = Column(
String(256),
nullable=True,
comment="User last name"
)
# Localization
language_code = Column(
String(8),
nullable=False,
default="en",
comment="User language code"
)
# Security and access control
role = Column(
String(32),
nullable=False,
default=UserRole.USER.value,
index=True,
comment="User role"
)
permissions = Column(
ARRAY(String),
nullable=False,
default=list,
comment="User permissions list"
)
# Activity tracking
last_activity = Column(
"last_use", # Keep old column name for compatibility
DateTime,
nullable=False,
default=datetime.utcnow,
index=True,
comment="Last user activity timestamp"
)
login_count = Column(
Integer,
nullable=False,
default=0,
comment="Total login count"
)
# Account status
is_verified = Column(
Boolean,
nullable=False,
default=False,
comment="Whether user is verified"
)
is_premium = Column(
Boolean,
nullable=False,
default=False,
comment="Whether user has premium access"
)
# Security settings
two_factor_enabled = Column(
Boolean,
nullable=False,
default=False,
comment="Whether 2FA is enabled"
)
security_settings = Column(
JSONB,
nullable=False,
default=dict,
comment="User security settings"
)
# Preferences
preferences = Column(
JSONB,
nullable=False,
default=dict,
comment="User preferences and settings"
)
# Statistics
content_uploaded_count = Column(
Integer,
nullable=False,
default=0,
comment="Number of content items uploaded"
)
content_purchased_count = Column(
Integer,
nullable=False,
default=0,
comment="Number of content items purchased"
)
# Rate limiting
rate_limit_reset = Column(
DateTime,
nullable=True,
comment="Rate limit reset timestamp"
)
rate_limit_count = Column(
Integer,
nullable=False,
default=0,
comment="Current rate limit count"
)
# Relationships
balances = relationship('UserBalance', back_populates='user', cascade="all, delete-orphan")
transactions = relationship('InternalTransaction', back_populates='user', cascade="all, delete-orphan")
wallet_connections = relationship('WalletConnection', back_populates='user', cascade="all, delete-orphan")
content_items = relationship('UserContent', back_populates='user', cascade="all, delete-orphan")
actions = relationship('UserAction', back_populates='user', cascade="all, delete-orphan")
activities = relationship('UserActivity', back_populates='user', cascade="all, delete-orphan")
# Indexes for performance
__table_args__ = (
Index('idx_users_telegram_id', 'telegram_id'),
Index('idx_users_username', 'username'),
Index('idx_users_role_status', 'role', 'status'),
Index('idx_users_last_activity', 'last_use'), # Use actual database column name
Index('idx_users_created_at', 'created_at'),
)
def __str__(self) -> str:
"""String representation"""
return f"User({self.id}, telegram_id={self.telegram_id}, username={self.username})"
@property
def full_name(self) -> str:
"""Get user's full name"""
parts = [self.first_name, self.last_name]
return " ".join(filter(None, parts)) or self.username or f"User_{self.telegram_id}"
@property
def display_name(self) -> str:
"""Get user's display name"""
return self.username or self.full_name
@property
def is_admin(self) -> bool:
"""Check if user is admin"""
return self.role in [UserRole.ADMIN.value, UserRole.SUPER_ADMIN.value]
@property
def is_moderator(self) -> bool:
"""Check if user is moderator or higher"""
return self.role in [UserRole.MODERATOR.value, UserRole.ADMIN.value, UserRole.SUPER_ADMIN.value]
@property
def cache_key(self) -> str:
"""Override cache key to include telegram_id"""
return f"user:telegram:{self.telegram_id}"
def has_permission(self, permission: str) -> bool:
"""Check if user has specific permission"""
if self.is_admin:
return True
return permission in (self.permissions or [])
def add_permission(self, permission: str) -> None:
"""Add permission to user"""
if not self.permissions:
self.permissions = []
if permission not in self.permissions:
self.permissions.append(permission)
def remove_permission(self, permission: str) -> None:
"""Remove permission from user"""
if self.permissions and permission in self.permissions:
self.permissions.remove(permission)
def update_activity(self) -> None:
"""Update user activity timestamp"""
self.last_activity = datetime.utcnow()
self.login_count += 1
def check_rate_limit(self, limit: int = None, window: int = None) -> bool:
"""Check if user is within rate limits"""
if self.is_admin:
return True
limit = limit or settings.RATE_LIMIT_REQUESTS
window = window or settings.RATE_LIMIT_WINDOW
now = datetime.utcnow()
# Reset counter if window has passed
if not self.rate_limit_reset or now > self.rate_limit_reset:
self.rate_limit_reset = now + timedelta(seconds=window)
self.rate_limit_count = 0
return self.rate_limit_count < limit
def increment_rate_limit(self) -> None:
"""Increment rate limit counter"""
if not self.is_admin:
self.rate_limit_count += 1
def set_preference(self, key: str, value: Any) -> None:
"""Set user preference"""
if not self.preferences:
self.preferences = {}
self.preferences[key] = value
def get_preference(self, key: str, default: Any = None) -> Any:
"""Get user preference"""
if not self.preferences:
return default
return self.preferences.get(key, default)
def set_security_setting(self, key: str, value: Any) -> None:
"""Set security setting"""
if not self.security_settings:
self.security_settings = {}
self.security_settings[key] = value
def get_security_setting(self, key: str, default: Any = None) -> Any:
"""Get security setting"""
if not self.security_settings:
return default
return self.security_settings.get(key, default)
def generate_api_token(self) -> str:
"""Generate secure API token for user"""
token_data = f"{self.id}:{self.telegram_id}:{datetime.utcnow().timestamp()}:{secrets.token_hex(16)}"
return hashlib.sha256(token_data.encode()).hexdigest()
def verify_token(self, token_hash: str) -> bool:
"""Verify API token hash against user"""
# Simple token verification - можно улучшить
try:
expected_token = self.generate_api_token()
# В реальной системе токены должны храниться в БД
# Для совместимости возвращаем True если пользователь активен
return self.status == UserStatus.ACTIVE.value
except Exception as e:
logger.error("Error verifying token", user_id=self.id, error=str(e))
return False
@property
def is_active(self) -> bool:
"""Check if user is active"""
return self.status == UserStatus.ACTIVE.value
def can_upload_content(self) -> bool:
"""Check if user can upload content"""
if self.status != UserStatus.ACTIVE.value:
return False
if not self.check_rate_limit(limit=10, window=3600): # 10 uploads per hour
return False
return True
def can_purchase_content(self) -> bool:
"""Check if user can purchase content"""
return self.status == UserStatus.ACTIVE.value
@classmethod
async def get_by_telegram_id(
cls,
session: AsyncSession,
telegram_id: int
) -> Optional['User']:
"""Get user by Telegram ID"""
try:
stmt = select(cls).where(cls.telegram_id == telegram_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
except Exception as e:
logger.error("Error getting user by telegram_id", telegram_id=telegram_id, error=str(e))
return None
@classmethod
async def get_by_username(
cls,
session: AsyncSession,
username: str
) -> Optional['User']:
"""Get user by username"""
try:
stmt = select(cls).where(cls.username == username)
result = await session.execute(stmt)
return result.scalar_one_or_none()
except Exception as e:
logger.error("Error getting user by username", username=username, error=str(e))
return None
@classmethod
async def get_by_id(
cls,
session: AsyncSession,
user_id
) -> Optional['User']:
"""Get user by ID (UUID or other identifier)"""
try:
stmt = select(cls).where(cls.id == user_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
except Exception as e:
logger.error("Error getting user by id", user_id=user_id, error=str(e))
return None
@classmethod
async def get_active_users(
cls,
session: AsyncSession,
days: int = 30,
limit: Optional[int] = None
) -> List['User']:
"""Get active users within specified days"""
try:
cutoff_date = datetime.utcnow() - timedelta(days=days)
stmt = select(cls).where(
cls.last_activity >= cutoff_date,
cls.status == UserStatus.ACTIVE.value
).order_by(cls.last_activity.desc())
if limit:
stmt = stmt.limit(limit)
result = await session.execute(stmt)
return result.scalars().all()
except Exception as e:
logger.error("Error getting active users", days=days, error=str(e))
return []
@classmethod
async def get_admins(cls, session: AsyncSession) -> List['User']:
"""Get all admin users"""
try:
stmt = select(cls).where(
cls.role.in_([UserRole.ADMIN.value, UserRole.SUPER_ADMIN.value])
)
result = await session.execute(stmt)
return result.scalars().all()
except Exception as e:
logger.error("Error getting admin users", error=str(e))
return []
@classmethod
async def create_from_telegram(
cls,
session: AsyncSession,
telegram_id: int,
username: Optional[str] = None,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
language_code: str = "en"
) -> 'User':
"""Create user from Telegram data"""
user = cls(
telegram_id=telegram_id,
username=username,
first_name=first_name,
last_name=last_name,
language_code=language_code,
status=UserStatus.ACTIVE.value
)
session.add(user)
await session.commit()
await session.refresh(user)
logger.info("User created from Telegram", telegram_id=telegram_id, user_id=user.id)
return user
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary with safe data"""
data = super().to_dict()
# Remove sensitive fields
sensitive_fields = ['security_settings', 'permissions']
for field in sensitive_fields:
data.pop(field, None)
return data
def to_public_dict(self) -> Dict[str, Any]:
"""Convert to public dictionary with minimal data"""
return {
'id': str(self.id),
'username': self.username,
'display_name': self.display_name,
'is_verified': self.is_verified,
'is_premium': self.is_premium,
'created_at': self.created_at.isoformat() if self.created_at else None
}
class UserSession(BaseModel):
"""User session model for authentication tracking"""
__tablename__ = 'user_sessions'
user_id = Column(
BigInteger,
nullable=False,
index=True,
comment="Associated user ID"
)
refresh_token_hash = Column(
String(255),
nullable=False,
comment="Hashed refresh token"
)
ip_address = Column(
String(45),
nullable=True,
comment="Session IP address"
)
user_agent = Column(
String(512),
nullable=True,
comment="User agent string"
)
expires_at = Column(
DateTime,
nullable=False,
comment="Session expiration time"
)
last_used_at = Column(
DateTime,
nullable=True,
comment="Last time session was used"
)
logged_out_at = Column(
DateTime,
nullable=True,
comment="Session logout time"
)
is_active = Column(
Boolean,
nullable=False,
default=True,
comment="Whether session is active"
)
remember_me = Column(
Boolean,
nullable=False,
default=False,
comment="Whether this is a remember me session"
)
# Indexes for performance
__table_args__ = (
Index('idx_user_sessions_user_id', 'user_id'),
Index('idx_user_sessions_expires_at', 'expires_at'),
Index('idx_user_sessions_active', 'is_active'),
)
def __str__(self) -> str:
return f"UserSession({self.id}, user_id={self.user_id}, active={self.is_active})"
def is_expired(self) -> bool:
"""Check if session is expired"""
return datetime.utcnow() > self.expires_at
def is_valid(self) -> bool:
"""Check if session is valid and active"""
return self.is_active and not self.is_expired() and not self.logged_out_at
class UserRole(BaseModel):
"""User role model for permissions"""
__tablename__ = 'user_roles'
name = Column(
String(64),
nullable=False,
unique=True,
index=True,
comment="Role name"
)
description = Column(
String(255),
nullable=True,
comment="Role description"
)
permissions = Column(
ARRAY(String),
nullable=False,
default=list,
comment="Role permissions list"
)
is_system = Column(
Boolean,
nullable=False,
default=False,
comment="Whether this is a system role"
)
def __str__(self) -> str:
return f"UserRole({self.name})"
def has_permission(self, permission: str) -> bool:
"""Check if role has specific permission"""
return permission in (self.permissions or [])
class ApiKey(BaseModel):
"""API key model for programmatic access"""
__tablename__ = 'api_keys'
user_id = Column(
BigInteger,
nullable=False,
index=True,
comment="Associated user ID"
)
name = Column(
String(128),
nullable=False,
comment="API key name"
)
key_hash = Column(
String(255),
nullable=False,
unique=True,
comment="Hashed API key"
)
permissions = Column(
ARRAY(String),
nullable=False,
default=list,
comment="API key permissions"
)
expires_at = Column(
DateTime,
nullable=True,
comment="API key expiration time"
)
last_used_at = Column(
DateTime,
nullable=True,
comment="Last time key was used"
)
is_active = Column(
Boolean,
nullable=False,
default=True,
comment="Whether key is active"
)
# Indexes for performance
__table_args__ = (
Index('idx_api_keys_user_id', 'user_id'),
Index('idx_api_keys_hash', 'key_hash'),
Index('idx_api_keys_active', 'is_active'),
)
def __str__(self) -> str:
return f"ApiKey({self.id}, name={self.name}, user_id={self.user_id})"
def is_expired(self) -> bool:
"""Check if API key is expired"""
if not self.expires_at:
return False
return datetime.utcnow() > self.expires_at
def is_valid(self) -> bool:
"""Check if API key is valid and active"""
return self.is_active and not self.is_expired()