uploader-bot/app/core/models/user.py

420 lines
13 KiB
Python

"""
User model with async support and enhanced security
"""
import hashlib
import secrets
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
from enum import Enum
from sqlalchemy import Column, String, BigInteger, Boolean, Integer, Index, text
from sqlalchemy.dialects.postgresql import ARRAY, JSONB
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import relationship
import structlog
from app.core.models.base import BaseModel
from app.core.config import settings
logger = structlog.get_logger(__name__)
class UserRole(str, Enum):
"""User role enumeration"""
USER = "user"
MODERATOR = "moderator"
ADMIN = "admin"
SUPER_ADMIN = "super_admin"
class UserStatus(str, Enum):
"""User status enumeration"""
ACTIVE = "active"
SUSPENDED = "suspended"
BANNED = "banned"
PENDING = "pending"
class User(BaseModel):
"""Enhanced User model with security and async support"""
__tablename__ = 'users'
# Telegram specific fields
telegram_id = Column(
BigInteger,
nullable=False,
unique=True,
index=True,
comment="Telegram user ID"
)
username = Column(
String(512),
nullable=True,
index=True,
comment="Telegram username"
)
first_name = Column(
String(256),
nullable=True,
comment="User first name"
)
last_name = Column(
String(256),
nullable=True,
comment="User last name"
)
# Localization
language_code = Column(
String(8),
nullable=False,
default="en",
comment="User language code"
)
# Security and access control
role = Column(
String(32),
nullable=False,
default=UserRole.USER.value,
index=True,
comment="User role"
)
permissions = Column(
ARRAY(String),
nullable=False,
default=list,
comment="User permissions list"
)
# Activity tracking
last_activity = Column(
"last_use", # Keep old column name for compatibility
DateTime,
nullable=False,
default=datetime.utcnow,
index=True,
comment="Last user activity timestamp"
)
login_count = Column(
Integer,
nullable=False,
default=0,
comment="Total login count"
)
# Account status
is_verified = Column(
Boolean,
nullable=False,
default=False,
comment="Whether user is verified"
)
is_premium = Column(
Boolean,
nullable=False,
default=False,
comment="Whether user has premium access"
)
# Security settings
two_factor_enabled = Column(
Boolean,
nullable=False,
default=False,
comment="Whether 2FA is enabled"
)
security_settings = Column(
JSONB,
nullable=False,
default=dict,
comment="User security settings"
)
# Preferences
preferences = Column(
JSONB,
nullable=False,
default=dict,
comment="User preferences and settings"
)
# Statistics
content_uploaded_count = Column(
Integer,
nullable=False,
default=0,
comment="Number of content items uploaded"
)
content_purchased_count = Column(
Integer,
nullable=False,
default=0,
comment="Number of content items purchased"
)
# Rate limiting
rate_limit_reset = Column(
DateTime,
nullable=True,
comment="Rate limit reset timestamp"
)
rate_limit_count = Column(
Integer,
nullable=False,
default=0,
comment="Current rate limit count"
)
# Relationships
balances = relationship('UserBalance', back_populates='user', cascade="all, delete-orphan")
transactions = relationship('InternalTransaction', back_populates='user', cascade="all, delete-orphan")
wallet_connections = relationship('WalletConnection', back_populates='user', cascade="all, delete-orphan")
content_items = relationship('UserContent', back_populates='user', cascade="all, delete-orphan")
actions = relationship('UserAction', back_populates='user', cascade="all, delete-orphan")
activities = relationship('UserActivity', back_populates='user', cascade="all, delete-orphan")
# Indexes for performance
__table_args__ = (
Index('idx_users_telegram_id', 'telegram_id'),
Index('idx_users_username', 'username'),
Index('idx_users_role_status', 'role', 'status'),
Index('idx_users_last_activity', 'last_activity'),
Index('idx_users_created_at', 'created_at'),
)
def __str__(self) -> str:
"""String representation"""
return f"User({self.id}, telegram_id={self.telegram_id}, username={self.username})"
@property
def full_name(self) -> str:
"""Get user's full name"""
parts = [self.first_name, self.last_name]
return " ".join(filter(None, parts)) or self.username or f"User_{self.telegram_id}"
@property
def display_name(self) -> str:
"""Get user's display name"""
return self.username or self.full_name
@property
def is_admin(self) -> bool:
"""Check if user is admin"""
return self.role in [UserRole.ADMIN.value, UserRole.SUPER_ADMIN.value]
@property
def is_moderator(self) -> bool:
"""Check if user is moderator or higher"""
return self.role in [UserRole.MODERATOR.value, UserRole.ADMIN.value, UserRole.SUPER_ADMIN.value]
@property
def cache_key(self) -> str:
"""Override cache key to include telegram_id"""
return f"user:telegram:{self.telegram_id}"
def has_permission(self, permission: str) -> bool:
"""Check if user has specific permission"""
if self.is_admin:
return True
return permission in (self.permissions or [])
def add_permission(self, permission: str) -> None:
"""Add permission to user"""
if not self.permissions:
self.permissions = []
if permission not in self.permissions:
self.permissions.append(permission)
def remove_permission(self, permission: str) -> None:
"""Remove permission from user"""
if self.permissions and permission in self.permissions:
self.permissions.remove(permission)
def update_activity(self) -> None:
"""Update user activity timestamp"""
self.last_activity = datetime.utcnow()
self.login_count += 1
def check_rate_limit(self, limit: int = None, window: int = None) -> bool:
"""Check if user is within rate limits"""
if self.is_admin:
return True
limit = limit or settings.RATE_LIMIT_REQUESTS
window = window or settings.RATE_LIMIT_WINDOW
now = datetime.utcnow()
# Reset counter if window has passed
if not self.rate_limit_reset or now > self.rate_limit_reset:
self.rate_limit_reset = now + timedelta(seconds=window)
self.rate_limit_count = 0
return self.rate_limit_count < limit
def increment_rate_limit(self) -> None:
"""Increment rate limit counter"""
if not self.is_admin:
self.rate_limit_count += 1
def set_preference(self, key: str, value: Any) -> None:
"""Set user preference"""
if not self.preferences:
self.preferences = {}
self.preferences[key] = value
def get_preference(self, key: str, default: Any = None) -> Any:
"""Get user preference"""
if not self.preferences:
return default
return self.preferences.get(key, default)
def set_security_setting(self, key: str, value: Any) -> None:
"""Set security setting"""
if not self.security_settings:
self.security_settings = {}
self.security_settings[key] = value
def get_security_setting(self, key: str, default: Any = None) -> Any:
"""Get security setting"""
if not self.security_settings:
return default
return self.security_settings.get(key, default)
def generate_api_token(self) -> str:
"""Generate secure API token for user"""
token_data = f"{self.id}:{self.telegram_id}:{datetime.utcnow().timestamp()}:{secrets.token_hex(16)}"
return hashlib.sha256(token_data.encode()).hexdigest()
def can_upload_content(self) -> bool:
"""Check if user can upload content"""
if self.status != UserStatus.ACTIVE.value:
return False
if not self.check_rate_limit(limit=10, window=3600): # 10 uploads per hour
return False
return True
def can_purchase_content(self) -> bool:
"""Check if user can purchase content"""
return self.status == UserStatus.ACTIVE.value
@classmethod
async def get_by_telegram_id(
cls,
session: AsyncSession,
telegram_id: int
) -> Optional['User']:
"""Get user by Telegram ID"""
try:
stmt = select(cls).where(cls.telegram_id == telegram_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
except Exception as e:
logger.error("Error getting user by telegram_id", telegram_id=telegram_id, error=str(e))
return None
@classmethod
async def get_by_username(
cls,
session: AsyncSession,
username: str
) -> Optional['User']:
"""Get user by username"""
try:
stmt = select(cls).where(cls.username == username)
result = await session.execute(stmt)
return result.scalar_one_or_none()
except Exception as e:
logger.error("Error getting user by username", username=username, error=str(e))
return None
@classmethod
async def get_active_users(
cls,
session: AsyncSession,
days: int = 30,
limit: Optional[int] = None
) -> List['User']:
"""Get active users within specified days"""
try:
cutoff_date = datetime.utcnow() - timedelta(days=days)
stmt = select(cls).where(
cls.last_activity >= cutoff_date,
cls.status == UserStatus.ACTIVE.value
).order_by(cls.last_activity.desc())
if limit:
stmt = stmt.limit(limit)
result = await session.execute(stmt)
return result.scalars().all()
except Exception as e:
logger.error("Error getting active users", days=days, error=str(e))
return []
@classmethod
async def get_admins(cls, session: AsyncSession) -> List['User']:
"""Get all admin users"""
try:
stmt = select(cls).where(
cls.role.in_([UserRole.ADMIN.value, UserRole.SUPER_ADMIN.value])
)
result = await session.execute(stmt)
return result.scalars().all()
except Exception as e:
logger.error("Error getting admin users", error=str(e))
return []
@classmethod
async def create_from_telegram(
cls,
session: AsyncSession,
telegram_id: int,
username: Optional[str] = None,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
language_code: str = "en"
) -> 'User':
"""Create user from Telegram data"""
user = cls(
telegram_id=telegram_id,
username=username,
first_name=first_name,
last_name=last_name,
language_code=language_code,
status=UserStatus.ACTIVE.value
)
session.add(user)
await session.commit()
await session.refresh(user)
logger.info("User created from Telegram", telegram_id=telegram_id, user_id=user.id)
return user
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary with safe data"""
data = super().to_dict()
# Remove sensitive fields
sensitive_fields = ['security_settings', 'permissions']
for field in sensitive_fields:
data.pop(field, None)
return data
def to_public_dict(self) -> Dict[str, Any]:
"""Convert to public dictionary with minimal data"""
return {
'id': str(self.id),
'username': self.username,
'display_name': self.display_name,
'is_verified': self.is_verified,
'is_premium': self.is_premium,
'created_at': self.created_at.isoformat() if self.created_at else None
}