uploader-bot/app/api/routes/auth_routes.py

870 lines
31 KiB
Python

"""
Authentication and authorization routes with JWT tokens, user management, and security features.
Provides user registration, login, token refresh, and account management with comprehensive validation.
"""
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from uuid import UUID, uuid4
from sanic import Blueprint, Request, response
from sanic.response import JSONResponse
from sqlalchemy import select, update, and_, or_
from sqlalchemy.orm import selectinload
from app.core.config import get_settings
from app.core.database import db_manager, get_cache_manager
from app.core.logging import get_logger
from app.core.models.user import User, UserSession, UserRole
from app.core.security import (
hash_password, verify_password, generate_access_token,
verify_access_token, generate_refresh_token, generate_api_key,
sanitize_input, generate_csrf_token
)
from app.api.middleware import require_auth, validate_request, rate_limit
from app.core.validation import (
UserRegistrationSchema, UserLoginSchema, UserUpdateSchema,
ApiKeySchema
)
# Initialize blueprint
auth_bp = Blueprint("auth", url_prefix="/api/v1/auth")
logger = get_logger(__name__)
settings = get_settings()
@auth_bp.route("/register", methods=["POST"])
@rate_limit(limit=5, window=3600) # 5 registrations per hour
@validate_request(UserRegistrationSchema)
async def register_user(request: Request) -> JSONResponse:
"""
Register new user with comprehensive validation and security checks.
Args:
request: Sanic request with user registration data
Returns:
JSONResponse: Registration result with access tokens
"""
try:
data = request.json
client_ip = request.headers.get("X-Forwarded-For", request.remote_addr)
# Sanitize input data
username = sanitize_input(data["username"])
email = sanitize_input(data["email"])
full_name = sanitize_input(data.get("full_name", ""))
async with db_manager.get_session() as session:
# Check if username already exists
username_stmt = select(User).where(User.username == username)
username_result = await session.execute(username_stmt)
if username_result.scalar_one_or_none():
return response.json(
{"error": "Username already exists", "code": "USERNAME_EXISTS"},
status=400
)
# Check if email already exists
email_stmt = select(User).where(User.email == email)
email_result = await session.execute(email_stmt)
if email_result.scalar_one_or_none():
return response.json(
{"error": "Email already registered", "code": "EMAIL_EXISTS"},
status=400
)
# Check registration rate limiting by IP
cache_manager = get_cache_manager()
ip_reg_key = f"registration_ip:{client_ip}"
ip_registrations = await cache_manager.get(ip_reg_key, default=0)
if ip_registrations >= 3: # Max 3 registrations per IP per day
return response.json(
{"error": "Too many registrations from this IP", "code": "IP_LIMIT_EXCEEDED"},
status=429
)
# Hash password
password_hash = hash_password(data["password"])
# Create user
new_user = User(
id=uuid4(),
username=username,
email=email,
password_hash=password_hash,
full_name=full_name,
is_active=True,
email_verified=False, # Require email verification
registration_ip=client_ip,
last_login_ip=client_ip,
settings={"theme": "light", "notifications": True}
)
session.add(new_user)
await session.commit()
await session.refresh(new_user)
# Assign default role
default_role_stmt = select(UserRole).where(UserRole.name == "user")
role_result = await session.execute(default_role_stmt)
default_role = role_result.scalar_one_or_none()
if default_role:
new_user.roles.append(default_role)
await session.commit()
# Update IP registration counter
await cache_manager.increment(ip_reg_key, ttl=86400) # 24 hours
# Generate tokens
access_token = generate_access_token(
{"user_id": str(new_user.id), "username": username},
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
)
refresh_token = generate_refresh_token(new_user.id)
# Create user session
session_id = str(uuid4())
csrf_token = generate_csrf_token(new_user.id, session_id)
async with db_manager.get_session() as session:
user_session = UserSession(
id=UUID(session_id),
user_id=new_user.id,
refresh_token_hash=hash_password(refresh_token[-32:]), # Hash last 32 chars
ip_address=client_ip,
user_agent=request.headers.get("User-Agent", ""),
expires_at=datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
)
session.add(user_session)
await session.commit()
await logger.ainfo(
"User registered successfully",
user_id=str(new_user.id),
username=username,
email=email,
ip=client_ip
)
return response.json({
"message": "Registration successful",
"user": {
"id": str(new_user.id),
"username": username,
"email": email,
"full_name": full_name,
"created_at": new_user.created_at.isoformat()
},
"tokens": {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "Bearer",
"expires_in": settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
},
"session": {
"session_id": session_id,
"csrf_token": csrf_token
}
}, status=201)
except Exception as e:
await logger.aerror(
"User registration failed",
username=data.get("username"),
email=data.get("email"),
error=str(e)
)
return response.json(
{"error": "Registration failed", "code": "REGISTRATION_FAILED"},
status=500
)
@auth_bp.route("/login", methods=["POST"])
@rate_limit(limit=10, window=900) # 10 login attempts per 15 minutes
@validate_request(UserLoginSchema)
async def login_user(request: Request) -> JSONResponse:
"""
Authenticate user and generate access tokens with security logging.
Args:
request: Sanic request with login credentials
Returns:
JSONResponse: Authentication result with tokens
"""
try:
data = request.json
username_or_email = sanitize_input(data["username"])
password = data["password"]
remember_me = data.get("remember_me", False)
client_ip = request.headers.get("X-Forwarded-For", request.remote_addr)
# Check login rate limiting
cache_manager = get_cache_manager()
login_key = f"login_attempts:{username_or_email}:{client_ip}"
attempts = await cache_manager.get(login_key, default=0)
if attempts >= 5: # Max 5 failed attempts
return response.json(
{"error": "Too many login attempts", "code": "LOGIN_BLOCKED"},
status=429
)
async with db_manager.get_session() as session:
# Find user by username or email
user_stmt = select(User).where(
or_(User.username == username_or_email, User.email == username_or_email)
).options(selectinload(User.roles))
user_result = await session.execute(user_stmt)
user = user_result.scalar_one_or_none()
if not user or not verify_password(password, user.password_hash):
# Increment failed attempts
await cache_manager.increment(login_key, ttl=900) # 15 minutes
await logger.awarning(
"Failed login attempt",
username=username_or_email,
ip=client_ip,
attempts=attempts + 1
)
return response.json(
{"error": "Invalid credentials", "code": "INVALID_CREDENTIALS"},
status=401
)
if not user.is_active:
return response.json(
{"error": "Account deactivated", "code": "ACCOUNT_DEACTIVATED"},
status=403
)
# Successful login - clear failed attempts
await cache_manager.delete(login_key)
# Update user login info
user.last_login_at = datetime.utcnow()
user.last_login_ip = client_ip
user.login_count = (user.login_count or 0) + 1
await session.commit()
# Generate tokens
user_permissions = []
for role in user.roles:
user_permissions.extend(role.permissions)
token_payload = {
"user_id": str(user.id),
"username": user.username,
"permissions": list(set(user_permissions)) # Remove duplicates
}
expires_in = settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
if remember_me:
expires_in *= 24 # 24x longer for remember me
access_token = generate_access_token(token_payload, expires_in=expires_in)
refresh_token = generate_refresh_token(user.id)
# Create user session
session_id = str(uuid4())
csrf_token = generate_csrf_token(user.id, session_id)
refresh_expires = timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
if remember_me:
refresh_expires *= 2 # Longer refresh for remember me
async with db_manager.get_session() as session:
user_session = UserSession(
id=UUID(session_id),
user_id=user.id,
refresh_token_hash=hash_password(refresh_token[-32:]),
ip_address=client_ip,
user_agent=request.headers.get("User-Agent", ""),
expires_at=datetime.utcnow() + refresh_expires,
remember_me=remember_me
)
session.add(user_session)
await session.commit()
await logger.ainfo(
"User logged in successfully",
user_id=str(user.id),
username=user.username,
ip=client_ip,
remember_me=remember_me
)
return response.json({
"message": "Login successful",
"user": {
"id": str(user.id),
"username": user.username,
"email": user.email,
"full_name": user.full_name,
"last_login": user.last_login_at.isoformat() if user.last_login_at else None,
"permissions": user_permissions
},
"tokens": {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "Bearer",
"expires_in": expires_in
},
"session": {
"session_id": session_id,
"csrf_token": csrf_token
}
})
except Exception as e:
await logger.aerror(
"Login failed",
username=data.get("username"),
error=str(e)
)
return response.json(
{"error": "Login failed", "code": "LOGIN_FAILED"},
status=500
)
@auth_bp.route("/refresh", methods=["POST"])
@rate_limit(limit=50, window=3600) # 50 refresh attempts per hour
async def refresh_tokens(request: Request) -> JSONResponse:
"""
Refresh access token using refresh token with rotation.
Args:
request: Sanic request with refresh token
Returns:
JSONResponse: New access and refresh tokens
"""
try:
refresh_token = request.json.get("refresh_token")
if not refresh_token:
return response.json(
{"error": "Refresh token required", "code": "TOKEN_REQUIRED"},
status=400
)
# Verify refresh token
payload = verify_access_token(refresh_token, token_type="refresh")
if not payload:
return response.json(
{"error": "Invalid refresh token", "code": "INVALID_TOKEN"},
status=401
)
user_id = UUID(payload["user_id"])
async with db_manager.get_session() as session:
# Verify session exists and is valid
session_stmt = select(UserSession).where(
and_(
UserSession.user_id == user_id,
UserSession.refresh_token_hash == hash_password(refresh_token[-32:]),
UserSession.expires_at > datetime.utcnow(),
UserSession.is_active == True
)
)
session_result = await session.execute(session_stmt)
user_session = session_result.scalar_one_or_none()
if not user_session:
return response.json(
{"error": "Session expired or invalid", "code": "SESSION_INVALID"},
status=401
)
# Get user with permissions
user_stmt = select(User).where(User.id == user_id).options(selectinload(User.roles))
user_result = await session.execute(user_stmt)
user = user_result.scalar_one_or_none()
if not user or not user.is_active:
return response.json(
{"error": "User not found or inactive", "code": "USER_INACTIVE"},
status=401
)
# Generate new tokens (token rotation)
user_permissions = []
for role in user.roles:
user_permissions.extend(role.permissions)
new_access_token = generate_access_token(
{
"user_id": str(user.id),
"username": user.username,
"permissions": list(set(user_permissions))
},
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
)
new_refresh_token = generate_refresh_token(user.id)
# Update session with new refresh token
user_session.refresh_token_hash = hash_password(new_refresh_token[-32:])
user_session.last_used_at = datetime.utcnow()
await session.commit()
await logger.adebug(
"Tokens refreshed",
user_id=str(user_id),
session_id=str(user_session.id)
)
return response.json({
"tokens": {
"access_token": new_access_token,
"refresh_token": new_refresh_token,
"token_type": "Bearer",
"expires_in": settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
}
})
except Exception as e:
await logger.aerror("Token refresh failed", error=str(e))
return response.json(
{"error": "Token refresh failed", "code": "REFRESH_FAILED"},
status=500
)
@auth_bp.route("/logout", methods=["POST"])
@require_auth()
async def logout_user(request: Request) -> JSONResponse:
"""
Logout user and invalidate session.
Args:
request: Sanic request object
Returns:
JSONResponse: Logout confirmation
"""
try:
user_id = request.ctx.user.id
session_id = request.headers.get("X-Session-ID")
if session_id:
async with db_manager.get_session() as session:
# Invalidate specific session
session_stmt = select(UserSession).where(
and_(
UserSession.id == UUID(session_id),
UserSession.user_id == user_id
)
)
session_result = await session.execute(session_stmt)
user_session = session_result.scalar_one_or_none()
if user_session:
user_session.is_active = False
user_session.logged_out_at = datetime.utcnow()
await session.commit()
await logger.ainfo(
"User logged out",
user_id=str(user_id),
session_id=session_id
)
return response.json({
"message": "Logout successful",
"timestamp": datetime.utcnow().isoformat()
})
except Exception as e:
await logger.aerror(
"Logout failed",
user_id=str(request.ctx.user.id),
error=str(e)
)
return response.json(
{"error": "Logout failed", "code": "LOGOUT_FAILED"},
status=500
)
@auth_bp.route("/me", methods=["GET"])
@require_auth()
async def get_current_user(request: Request) -> JSONResponse:
"""
Get current user information and permissions.
Args:
request: Sanic request object
Returns:
JSONResponse: Current user data
"""
try:
user = request.ctx.user
async with db_manager.get_session() as session:
# Get user with full details
user_stmt = select(User).where(User.id == user.id).options(
selectinload(User.roles),
selectinload(User.api_keys)
)
user_result = await session.execute(user_stmt)
full_user = user_result.scalar_one_or_none()
if not full_user:
return response.json(
{"error": "User not found", "code": "USER_NOT_FOUND"},
status=404
)
# Get user permissions
permissions = []
roles = []
for role in full_user.roles:
roles.append({
"name": role.name,
"description": role.description
})
permissions.extend(role.permissions)
# Get active sessions
sessions_stmt = select(UserSession).where(
and_(
UserSession.user_id == user.id,
UserSession.is_active == True,
UserSession.expires_at > datetime.utcnow()
)
)
sessions_result = await session.execute(sessions_stmt)
active_sessions = sessions_result.scalars().all()
return response.json({
"user": {
"id": str(full_user.id),
"username": full_user.username,
"email": full_user.email,
"full_name": full_user.full_name,
"bio": full_user.bio,
"avatar_url": full_user.avatar_url,
"is_active": full_user.is_active,
"email_verified": full_user.email_verified,
"created_at": full_user.created_at.isoformat(),
"last_login_at": full_user.last_login_at.isoformat() if full_user.last_login_at else None,
"login_count": full_user.login_count,
"settings": full_user.settings
},
"roles": roles,
"permissions": list(set(permissions)),
"active_sessions": len(active_sessions),
"api_keys": [
{
"id": str(key.id),
"name": key.name,
"created_at": key.created_at.isoformat(),
"last_used_at": key.last_used_at.isoformat() if key.last_used_at else None,
"expires_at": key.expires_at.isoformat() if key.expires_at else None
}
for key in full_user.api_keys
if key.is_active
]
})
except Exception as e:
await logger.aerror(
"Failed to get current user",
user_id=str(request.ctx.user.id),
error=str(e)
)
return response.json(
{"error": "Failed to get user information", "code": "USER_INFO_FAILED"},
status=500
)
@auth_bp.route("/me", methods=["PUT"])
@require_auth()
@validate_request(UserUpdateSchema)
async def update_current_user(request: Request) -> JSONResponse:
"""
Update current user profile information.
Args:
request: Sanic request with update data
Returns:
JSONResponse: Updated user information
"""
try:
user_id = request.ctx.user.id
data = request.json
async with db_manager.get_session() as session:
# Get current user
user_stmt = select(User).where(User.id == user_id)
user_result = await session.execute(user_stmt)
user = user_result.scalar_one_or_none()
if not user:
return response.json(
{"error": "User not found", "code": "USER_NOT_FOUND"},
status=404
)
# Update allowed fields
updatable_fields = ["full_name", "bio", "avatar_url", "settings"]
for field in updatable_fields:
if field in data:
if field == "full_name":
setattr(user, field, sanitize_input(data[field]))
elif field == "bio":
setattr(user, field, sanitize_input(data[field], max_length=500))
else:
setattr(user, field, data[field])
# Handle email change (requires verification)
if "email" in data and data["email"] != user.email:
new_email = sanitize_input(data["email"])
# Check if email is already taken
email_stmt = select(User).where(
and_(User.email == new_email, User.id != user_id)
)
email_result = await session.execute(email_stmt)
if email_result.scalar_one_or_none():
return response.json(
{"error": "Email already in use", "code": "EMAIL_IN_USE"},
status=400
)
user.email = new_email
user.email_verified = False # Require re-verification
user.updated_at = datetime.utcnow()
await session.commit()
await logger.ainfo(
"User profile updated",
user_id=str(user_id),
updated_fields=list(data.keys())
)
return response.json({
"message": "Profile updated successfully",
"user": {
"id": str(user.id),
"username": user.username,
"email": user.email,
"full_name": user.full_name,
"bio": user.bio,
"avatar_url": user.avatar_url,
"updated_at": user.updated_at.isoformat()
}
})
except Exception as e:
await logger.aerror(
"Failed to update user profile",
user_id=str(request.ctx.user.id),
error=str(e)
)
return response.json(
{"error": "Failed to update profile", "code": "UPDATE_FAILED"},
status=500
)
@auth_bp.route("/api-keys", methods=["POST"])
@rate_limit(limit=5, window=3600) # 5 API keys per hour
@require_auth(permissions=["api.create"])
@validate_request(ApiKeySchema)
async def create_api_key(request: Request) -> JSONResponse:
"""
Create new API key for programmatic access.
Args:
request: Sanic request with API key data
Returns:
JSONResponse: Created API key information
"""
try:
user_id = request.ctx.user.id
data = request.json
# Generate API key
api_key = generate_api_key(
user_id=user_id,
permissions=data["permissions"],
name=data["name"],
expires_in=None if not data.get("expires_at") else
int((datetime.fromisoformat(data["expires_at"]) - datetime.utcnow()).total_seconds())
)
async with db_manager.get_session() as session:
from app.core.models.user import ApiKey
# Create API key record
new_api_key = ApiKey(
id=uuid4(),
user_id=user_id,
name=sanitize_input(data["name"]),
key_hash=hash_password(api_key[-32:]), # Hash last 32 chars
permissions=data["permissions"],
expires_at=datetime.fromisoformat(data["expires_at"]) if data.get("expires_at") else None
)
session.add(new_api_key)
await session.commit()
await session.refresh(new_api_key)
await logger.ainfo(
"API key created",
user_id=str(user_id),
api_key_id=str(new_api_key.id),
name=data["name"],
permissions=data["permissions"]
)
return response.json({
"message": "API key created successfully",
"api_key": {
"id": str(new_api_key.id),
"name": new_api_key.name,
"key": api_key, # Only returned once
"permissions": new_api_key.permissions,
"created_at": new_api_key.created_at.isoformat(),
"expires_at": new_api_key.expires_at.isoformat() if new_api_key.expires_at else None
},
"warning": "Save this API key securely. It will not be shown again."
}, status=201)
except Exception as e:
await logger.aerror(
"Failed to create API key",
user_id=str(request.ctx.user.id),
error=str(e)
)
return response.json(
{"error": "Failed to create API key", "code": "API_KEY_FAILED"},
status=500
)
@auth_bp.route("/sessions", methods=["GET"])
@require_auth()
async def get_user_sessions(request: Request) -> JSONResponse:
"""
Get all active user sessions.
Args:
request: Sanic request object
Returns:
JSONResponse: List of active sessions
"""
try:
user_id = request.ctx.user.id
async with db_manager.get_session() as session:
sessions_stmt = select(UserSession).where(
and_(
UserSession.user_id == user_id,
UserSession.is_active == True,
UserSession.expires_at > datetime.utcnow()
)
).order_by(UserSession.created_at.desc())
sessions_result = await session.execute(sessions_stmt)
sessions = sessions_result.scalars().all()
sessions_data = []
for sess in sessions:
sessions_data.append({
"id": str(sess.id),
"ip_address": sess.ip_address,
"user_agent": sess.user_agent,
"created_at": sess.created_at.isoformat(),
"last_used_at": sess.last_used_at.isoformat() if sess.last_used_at else None,
"expires_at": sess.expires_at.isoformat(),
"remember_me": sess.remember_me,
"is_current": str(sess.id) == request.headers.get("X-Session-ID")
})
return response.json({
"sessions": sessions_data,
"total": len(sessions_data)
})
except Exception as e:
await logger.aerror(
"Failed to get user sessions",
user_id=str(request.ctx.user.id),
error=str(e)
)
return response.json(
{"error": "Failed to get sessions", "code": "SESSIONS_FAILED"},
status=500
)
@auth_bp.route("/sessions/<session_id:uuid>", methods=["DELETE"])
@require_auth()
async def revoke_session(request: Request, session_id: UUID) -> JSONResponse:
"""
Revoke specific user session.
Args:
request: Sanic request object
session_id: Session UUID to revoke
Returns:
JSONResponse: Revocation status
"""
try:
user_id = request.ctx.user.id
async with db_manager.get_session() as session:
session_stmt = select(UserSession).where(
and_(
UserSession.id == session_id,
UserSession.user_id == user_id
)
)
session_result = await session.execute(session_stmt)
user_session = session_result.scalar_one_or_none()
if not user_session:
return response.json(
{"error": "Session not found", "code": "SESSION_NOT_FOUND"},
status=404
)
user_session.is_active = False
user_session.logged_out_at = datetime.utcnow()
await session.commit()
await logger.ainfo(
"Session revoked",
user_id=str(user_id),
session_id=str(session_id)
)
return response.json({
"message": "Session revoked successfully",
"session_id": str(session_id)
})
except Exception as e:
await logger.aerror(
"Failed to revoke session",
user_id=str(request.ctx.user.id),
session_id=str(session_id),
error=str(e)
)
return response.json(
{"error": "Failed to revoke session", "code": "REVOKE_FAILED"},
status=500
)