"""Redis caching system with fallback support.""" import json import logging import pickle from typing import Any, Optional, Union, Dict, List from contextlib import asynccontextmanager from functools import wraps import redis.asyncio as redis from redis.asyncio import ConnectionPool from app.core.config_compatible import get_settings logger = logging.getLogger(__name__) # Global Redis connection pool _redis_pool: Optional[ConnectionPool] = None _redis_client: Optional[redis.Redis] = None class CacheError(Exception): """Custom cache error.""" pass async def init_cache() -> None: """Initialize Redis cache connection.""" global _redis_pool, _redis_client settings = get_settings() if not settings.redis_enabled or not settings.cache_enabled: logger.info("Redis caching is disabled") return try: # Create connection pool _redis_pool = ConnectionPool( host=settings.redis_host, port=settings.redis_port, password=settings.redis_password, db=settings.redis_db, max_connections=settings.redis_max_connections, socket_timeout=settings.redis_socket_timeout, socket_connect_timeout=settings.redis_socket_connect_timeout, decode_responses=False, # We'll handle encoding manually for flexibility retry_on_timeout=True, health_check_interval=30, ) # Create Redis client _redis_client = redis.Redis(connection_pool=_redis_pool) # Test connection await _redis_client.ping() logger.info(f"Redis cache initialized successfully at {settings.redis_host}:{settings.redis_port}") except Exception as e: logger.warning(f"Failed to initialize Redis cache: {e}. Caching will be disabled.") _redis_pool = None _redis_client = None async def close_cache() -> None: """Close Redis cache connection.""" global _redis_pool, _redis_client if _redis_client: try: await _redis_client.close() logger.info("Redis cache connection closed") except Exception as e: logger.error(f"Error closing Redis cache: {e}") finally: _redis_client = None _redis_pool = None def get_redis_client() -> Optional[redis.Redis]: """Get Redis client instance.""" return _redis_client def is_cache_available() -> bool: """Check if cache is available.""" return _redis_client is not None class Cache: """Redis cache manager with fallback support.""" def __init__(self): self.settings = get_settings() def _serialize(self, value: Any) -> bytes: """Serialize value for storage.""" try: if isinstance(value, (str, int, float, bool)): return json.dumps(value).encode('utf-8') else: return pickle.dumps(value) except Exception as e: logger.error(f"Failed to serialize cache value: {e}") raise CacheError(f"Serialization error: {e}") def _deserialize(self, data: bytes) -> Any: """Deserialize value from storage.""" try: # Try JSON first (for simple types) try: return json.loads(data.decode('utf-8')) except (json.JSONDecodeError, UnicodeDecodeError): # Fallback to pickle for complex objects return pickle.loads(data) except Exception as e: logger.error(f"Failed to deserialize cache value: {e}") raise CacheError(f"Deserialization error: {e}") def _make_key(self, key: str, prefix: str = "myuploader") -> str: """Create cache key with prefix.""" return f"{prefix}:{key}" async def get(self, key: str, default: Any = None) -> Any: """Get value from cache.""" if not is_cache_available(): return default try: redis_key = self._make_key(key) data = await _redis_client.get(redis_key) if data is None: return default return self._deserialize(data) except Exception as e: logger.warning(f"Cache get error for key '{key}': {e}") return default async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool: """Set value in cache.""" if not is_cache_available(): return False try: redis_key = self._make_key(key) data = self._serialize(value) if ttl is None: ttl = self.settings.cache_default_ttl await _redis_client.setex(redis_key, ttl, data) return True except Exception as e: logger.warning(f"Cache set error for key '{key}': {e}") return False async def delete(self, key: str) -> bool: """Delete value from cache.""" if not is_cache_available(): return False try: redis_key = self._make_key(key) result = await _redis_client.delete(redis_key) return bool(result) except Exception as e: logger.warning(f"Cache delete error for key '{key}': {e}") return False async def exists(self, key: str) -> bool: """Check if key exists in cache.""" if not is_cache_available(): return False try: redis_key = self._make_key(key) result = await _redis_client.exists(redis_key) return bool(result) except Exception as e: logger.warning(f"Cache exists error for key '{key}': {e}") return False async def expire(self, key: str, ttl: int) -> bool: """Set expiration time for key.""" if not is_cache_available(): return False try: redis_key = self._make_key(key) result = await _redis_client.expire(redis_key, ttl) return bool(result) except Exception as e: logger.warning(f"Cache expire error for key '{key}': {e}") return False async def clear_pattern(self, pattern: str) -> int: """Clear all keys matching pattern.""" if not is_cache_available(): return 0 try: redis_pattern = self._make_key(pattern) keys = await _redis_client.keys(redis_pattern) if keys: result = await _redis_client.delete(*keys) return result return 0 except Exception as e: logger.warning(f"Cache clear pattern error for pattern '{pattern}': {e}") return 0 async def increment(self, key: str, amount: int = 1, ttl: Optional[int] = None) -> Optional[int]: """Increment counter in cache.""" if not is_cache_available(): return None try: redis_key = self._make_key(key) result = await _redis_client.incrby(redis_key, amount) if ttl is not None: await _redis_client.expire(redis_key, ttl) return result except Exception as e: logger.warning(f"Cache increment error for key '{key}': {e}") return None async def get_multiple(self, keys: List[str]) -> Dict[str, Any]: """Get multiple values from cache.""" if not is_cache_available(): return {} try: redis_keys = [self._make_key(key) for key in keys] values = await _redis_client.mget(redis_keys) result = {} for i, (key, data) in enumerate(zip(keys, values)): if data is not None: try: result[key] = self._deserialize(data) except Exception as e: logger.warning(f"Failed to deserialize cached value for key '{key}': {e}") return result except Exception as e: logger.warning(f"Cache get_multiple error: {e}") return {} async def set_multiple(self, mapping: Dict[str, Any], ttl: Optional[int] = None) -> bool: """Set multiple values in cache.""" if not is_cache_available(): return False try: pipeline = _redis_client.pipeline() for key, value in mapping.items(): redis_key = self._make_key(key) data = self._serialize(value) if ttl is None: ttl = self.settings.cache_default_ttl pipeline.setex(redis_key, ttl, data) await pipeline.execute() return True except Exception as e: logger.warning(f"Cache set_multiple error: {e}") return False # Global cache instance cache = Cache() # Caching decorators def cached(ttl: Optional[int] = None, key_prefix: str = "func"): """Decorator for caching function results.""" def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): if not is_cache_available(): return await func(*args, **kwargs) # Create cache key from function name and arguments key_parts = [key_prefix, func.__name__] if args: key_parts.extend([str(arg) for arg in args]) if kwargs: key_parts.extend([f"{k}={v}" for k, v in sorted(kwargs.items())]) cache_key = ":".join(key_parts) # Try to get from cache result = await cache.get(cache_key) if result is not None: return result # Call function and cache result result = await func(*args, **kwargs) await cache.set(cache_key, result, ttl) return result return wrapper return decorator def cache_user_data(ttl: Optional[int] = None): """Decorator for caching user-specific data.""" if ttl is None: ttl = get_settings().cache_user_ttl return cached(ttl=ttl, key_prefix="user") def cache_content_data(ttl: Optional[int] = None): """Decorator for caching content data.""" if ttl is None: ttl = get_settings().cache_content_ttl return cached(ttl=ttl, key_prefix="content") # Cache health check async def check_cache_health() -> Dict[str, Any]: """Check cache health and return status.""" if not is_cache_available(): return { "status": "disabled", "available": False, "error": "Redis not initialized" } try: # Test basic operations test_key = "health_check" test_value = {"timestamp": "test"} await cache.set(test_key, test_value, 10) retrieved = await cache.get(test_key) await cache.delete(test_key) # Get Redis info info = await _redis_client.info() return { "status": "healthy", "available": True, "test_passed": retrieved == test_value, "connected_clients": info.get("connected_clients", 0), "used_memory": info.get("used_memory_human", "unknown"), "total_commands_processed": info.get("total_commands_processed", 0), } except Exception as e: return { "status": "error", "available": False, "error": str(e) } # Context manager for cache operations @asynccontextmanager async def cache_context(): """Context manager for cache operations.""" try: yield cache except Exception as e: logger.error(f"Cache context error: {e}") raise