uploader-bot/app/core/cache.py

385 lines
12 KiB
Python

"""Redis caching system with fallback support."""
import json
import logging
import pickle
from typing import Any, Optional, Union, Dict, List
from contextlib import asynccontextmanager
from functools import wraps
import redis.asyncio as redis
from redis.asyncio import ConnectionPool
from app.core.config_compatible import get_settings
logger = logging.getLogger(__name__)
# Global Redis connection pool
_redis_pool: Optional[ConnectionPool] = None
_redis_client: Optional[redis.Redis] = None
class CacheError(Exception):
"""Custom cache error."""
pass
async def init_cache() -> None:
"""Initialize Redis cache connection."""
global _redis_pool, _redis_client
settings = get_settings()
if not settings.redis_enabled or not settings.cache_enabled:
logger.info("Redis caching is disabled")
return
try:
# Create connection pool
_redis_pool = ConnectionPool(
host=settings.redis_host,
port=settings.redis_port,
password=settings.redis_password,
db=settings.redis_db,
max_connections=settings.redis_max_connections,
socket_timeout=settings.redis_socket_timeout,
socket_connect_timeout=settings.redis_socket_connect_timeout,
decode_responses=False, # We'll handle encoding manually for flexibility
retry_on_timeout=True,
health_check_interval=30,
)
# Create Redis client
_redis_client = redis.Redis(connection_pool=_redis_pool)
# Test connection
await _redis_client.ping()
logger.info(f"Redis cache initialized successfully at {settings.redis_host}:{settings.redis_port}")
except Exception as e:
logger.warning(f"Failed to initialize Redis cache: {e}. Caching will be disabled.")
_redis_pool = None
_redis_client = None
async def close_cache() -> None:
"""Close Redis cache connection."""
global _redis_pool, _redis_client
if _redis_client:
try:
await _redis_client.close()
logger.info("Redis cache connection closed")
except Exception as e:
logger.error(f"Error closing Redis cache: {e}")
finally:
_redis_client = None
_redis_pool = None
def get_redis_client() -> Optional[redis.Redis]:
"""Get Redis client instance."""
return _redis_client
def is_cache_available() -> bool:
"""Check if cache is available."""
return _redis_client is not None
class Cache:
"""Redis cache manager with fallback support."""
def __init__(self):
self.settings = get_settings()
def _serialize(self, value: Any) -> bytes:
"""Serialize value for storage."""
try:
if isinstance(value, (str, int, float, bool)):
return json.dumps(value).encode('utf-8')
else:
return pickle.dumps(value)
except Exception as e:
logger.error(f"Failed to serialize cache value: {e}")
raise CacheError(f"Serialization error: {e}")
def _deserialize(self, data: bytes) -> Any:
"""Deserialize value from storage."""
try:
# Try JSON first (for simple types)
try:
return json.loads(data.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError):
# Fallback to pickle for complex objects
return pickle.loads(data)
except Exception as e:
logger.error(f"Failed to deserialize cache value: {e}")
raise CacheError(f"Deserialization error: {e}")
def _make_key(self, key: str, prefix: str = "myuploader") -> str:
"""Create cache key with prefix."""
return f"{prefix}:{key}"
async def get(self, key: str, default: Any = None) -> Any:
"""Get value from cache."""
if not is_cache_available():
return default
try:
redis_key = self._make_key(key)
data = await _redis_client.get(redis_key)
if data is None:
return default
return self._deserialize(data)
except Exception as e:
logger.warning(f"Cache get error for key '{key}': {e}")
return default
async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
"""Set value in cache."""
if not is_cache_available():
return False
try:
redis_key = self._make_key(key)
data = self._serialize(value)
if ttl is None:
ttl = self.settings.cache_default_ttl
await _redis_client.setex(redis_key, ttl, data)
return True
except Exception as e:
logger.warning(f"Cache set error for key '{key}': {e}")
return False
async def delete(self, key: str) -> bool:
"""Delete value from cache."""
if not is_cache_available():
return False
try:
redis_key = self._make_key(key)
result = await _redis_client.delete(redis_key)
return bool(result)
except Exception as e:
logger.warning(f"Cache delete error for key '{key}': {e}")
return False
async def exists(self, key: str) -> bool:
"""Check if key exists in cache."""
if not is_cache_available():
return False
try:
redis_key = self._make_key(key)
result = await _redis_client.exists(redis_key)
return bool(result)
except Exception as e:
logger.warning(f"Cache exists error for key '{key}': {e}")
return False
async def expire(self, key: str, ttl: int) -> bool:
"""Set expiration time for key."""
if not is_cache_available():
return False
try:
redis_key = self._make_key(key)
result = await _redis_client.expire(redis_key, ttl)
return bool(result)
except Exception as e:
logger.warning(f"Cache expire error for key '{key}': {e}")
return False
async def clear_pattern(self, pattern: str) -> int:
"""Clear all keys matching pattern."""
if not is_cache_available():
return 0
try:
redis_pattern = self._make_key(pattern)
keys = await _redis_client.keys(redis_pattern)
if keys:
result = await _redis_client.delete(*keys)
return result
return 0
except Exception as e:
logger.warning(f"Cache clear pattern error for pattern '{pattern}': {e}")
return 0
async def increment(self, key: str, amount: int = 1, ttl: Optional[int] = None) -> Optional[int]:
"""Increment counter in cache."""
if not is_cache_available():
return None
try:
redis_key = self._make_key(key)
result = await _redis_client.incrby(redis_key, amount)
if ttl is not None:
await _redis_client.expire(redis_key, ttl)
return result
except Exception as e:
logger.warning(f"Cache increment error for key '{key}': {e}")
return None
async def get_multiple(self, keys: List[str]) -> Dict[str, Any]:
"""Get multiple values from cache."""
if not is_cache_available():
return {}
try:
redis_keys = [self._make_key(key) for key in keys]
values = await _redis_client.mget(redis_keys)
result = {}
for i, (key, data) in enumerate(zip(keys, values)):
if data is not None:
try:
result[key] = self._deserialize(data)
except Exception as e:
logger.warning(f"Failed to deserialize cached value for key '{key}': {e}")
return result
except Exception as e:
logger.warning(f"Cache get_multiple error: {e}")
return {}
async def set_multiple(self, mapping: Dict[str, Any], ttl: Optional[int] = None) -> bool:
"""Set multiple values in cache."""
if not is_cache_available():
return False
try:
pipeline = _redis_client.pipeline()
for key, value in mapping.items():
redis_key = self._make_key(key)
data = self._serialize(value)
if ttl is None:
ttl = self.settings.cache_default_ttl
pipeline.setex(redis_key, ttl, data)
await pipeline.execute()
return True
except Exception as e:
logger.warning(f"Cache set_multiple error: {e}")
return False
# Global cache instance
cache = Cache()
# Caching decorators
def cached(ttl: Optional[int] = None, key_prefix: str = "func"):
"""Decorator for caching function results."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
if not is_cache_available():
return await func(*args, **kwargs)
# Create cache key from function name and arguments
key_parts = [key_prefix, func.__name__]
if args:
key_parts.extend([str(arg) for arg in args])
if kwargs:
key_parts.extend([f"{k}={v}" for k, v in sorted(kwargs.items())])
cache_key = ":".join(key_parts)
# Try to get from cache
result = await cache.get(cache_key)
if result is not None:
return result
# Call function and cache result
result = await func(*args, **kwargs)
await cache.set(cache_key, result, ttl)
return result
return wrapper
return decorator
def cache_user_data(ttl: Optional[int] = None):
"""Decorator for caching user-specific data."""
if ttl is None:
ttl = get_settings().cache_user_ttl
return cached(ttl=ttl, key_prefix="user")
def cache_content_data(ttl: Optional[int] = None):
"""Decorator for caching content data."""
if ttl is None:
ttl = get_settings().cache_content_ttl
return cached(ttl=ttl, key_prefix="content")
# Cache health check
async def check_cache_health() -> Dict[str, Any]:
"""Check cache health and return status."""
if not is_cache_available():
return {
"status": "disabled",
"available": False,
"error": "Redis not initialized"
}
try:
# Test basic operations
test_key = "health_check"
test_value = {"timestamp": "test"}
await cache.set(test_key, test_value, 10)
retrieved = await cache.get(test_key)
await cache.delete(test_key)
# Get Redis info
info = await _redis_client.info()
return {
"status": "healthy",
"available": True,
"test_passed": retrieved == test_value,
"connected_clients": info.get("connected_clients", 0),
"used_memory": info.get("used_memory_human", "unknown"),
"total_commands_processed": info.get("total_commands_processed", 0),
}
except Exception as e:
return {
"status": "error",
"available": False,
"error": str(e)
}
# Context manager for cache operations
@asynccontextmanager
async def cache_context():
"""Context manager for cache operations."""
try:
yield cache
except Exception as e:
logger.error(f"Cache context error: {e}")
raise