385 lines
12 KiB
Python
385 lines
12 KiB
Python
"""Redis caching system with fallback support."""
|
|
|
|
import json
|
|
import logging
|
|
import pickle
|
|
from typing import Any, Optional, Union, Dict, List
|
|
from contextlib import asynccontextmanager
|
|
from functools import wraps
|
|
|
|
import redis.asyncio as redis
|
|
from redis.asyncio import ConnectionPool
|
|
|
|
from app.core.config_compatible import get_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Global Redis connection pool
|
|
_redis_pool: Optional[ConnectionPool] = None
|
|
_redis_client: Optional[redis.Redis] = None
|
|
|
|
|
|
class CacheError(Exception):
|
|
"""Custom cache error."""
|
|
pass
|
|
|
|
|
|
async def init_cache() -> None:
|
|
"""Initialize Redis cache connection."""
|
|
global _redis_pool, _redis_client
|
|
|
|
settings = get_settings()
|
|
|
|
if not settings.redis_enabled or not settings.cache_enabled:
|
|
logger.info("Redis caching is disabled")
|
|
return
|
|
|
|
try:
|
|
# Create connection pool
|
|
_redis_pool = ConnectionPool(
|
|
host=settings.redis_host,
|
|
port=settings.redis_port,
|
|
password=settings.redis_password,
|
|
db=settings.redis_db,
|
|
max_connections=settings.redis_max_connections,
|
|
socket_timeout=settings.redis_socket_timeout,
|
|
socket_connect_timeout=settings.redis_socket_connect_timeout,
|
|
decode_responses=False, # We'll handle encoding manually for flexibility
|
|
retry_on_timeout=True,
|
|
health_check_interval=30,
|
|
)
|
|
|
|
# Create Redis client
|
|
_redis_client = redis.Redis(connection_pool=_redis_pool)
|
|
|
|
# Test connection
|
|
await _redis_client.ping()
|
|
|
|
logger.info(f"Redis cache initialized successfully at {settings.redis_host}:{settings.redis_port}")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to initialize Redis cache: {e}. Caching will be disabled.")
|
|
_redis_pool = None
|
|
_redis_client = None
|
|
|
|
|
|
async def close_cache() -> None:
|
|
"""Close Redis cache connection."""
|
|
global _redis_pool, _redis_client
|
|
|
|
if _redis_client:
|
|
try:
|
|
await _redis_client.close()
|
|
logger.info("Redis cache connection closed")
|
|
except Exception as e:
|
|
logger.error(f"Error closing Redis cache: {e}")
|
|
finally:
|
|
_redis_client = None
|
|
_redis_pool = None
|
|
|
|
|
|
def get_redis_client() -> Optional[redis.Redis]:
|
|
"""Get Redis client instance."""
|
|
return _redis_client
|
|
|
|
|
|
def is_cache_available() -> bool:
|
|
"""Check if cache is available."""
|
|
return _redis_client is not None
|
|
|
|
|
|
class Cache:
|
|
"""Redis cache manager with fallback support."""
|
|
|
|
def __init__(self):
|
|
self.settings = get_settings()
|
|
|
|
def _serialize(self, value: Any) -> bytes:
|
|
"""Serialize value for storage."""
|
|
try:
|
|
if isinstance(value, (str, int, float, bool)):
|
|
return json.dumps(value).encode('utf-8')
|
|
else:
|
|
return pickle.dumps(value)
|
|
except Exception as e:
|
|
logger.error(f"Failed to serialize cache value: {e}")
|
|
raise CacheError(f"Serialization error: {e}")
|
|
|
|
def _deserialize(self, data: bytes) -> Any:
|
|
"""Deserialize value from storage."""
|
|
try:
|
|
# Try JSON first (for simple types)
|
|
try:
|
|
return json.loads(data.decode('utf-8'))
|
|
except (json.JSONDecodeError, UnicodeDecodeError):
|
|
# Fallback to pickle for complex objects
|
|
return pickle.loads(data)
|
|
except Exception as e:
|
|
logger.error(f"Failed to deserialize cache value: {e}")
|
|
raise CacheError(f"Deserialization error: {e}")
|
|
|
|
def _make_key(self, key: str, prefix: str = "myuploader") -> str:
|
|
"""Create cache key with prefix."""
|
|
return f"{prefix}:{key}"
|
|
|
|
async def get(self, key: str, default: Any = None) -> Any:
|
|
"""Get value from cache."""
|
|
if not is_cache_available():
|
|
return default
|
|
|
|
try:
|
|
redis_key = self._make_key(key)
|
|
data = await _redis_client.get(redis_key)
|
|
|
|
if data is None:
|
|
return default
|
|
|
|
return self._deserialize(data)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Cache get error for key '{key}': {e}")
|
|
return default
|
|
|
|
async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
|
|
"""Set value in cache."""
|
|
if not is_cache_available():
|
|
return False
|
|
|
|
try:
|
|
redis_key = self._make_key(key)
|
|
data = self._serialize(value)
|
|
|
|
if ttl is None:
|
|
ttl = self.settings.cache_default_ttl
|
|
|
|
await _redis_client.setex(redis_key, ttl, data)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Cache set error for key '{key}': {e}")
|
|
return False
|
|
|
|
async def delete(self, key: str) -> bool:
|
|
"""Delete value from cache."""
|
|
if not is_cache_available():
|
|
return False
|
|
|
|
try:
|
|
redis_key = self._make_key(key)
|
|
result = await _redis_client.delete(redis_key)
|
|
return bool(result)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Cache delete error for key '{key}': {e}")
|
|
return False
|
|
|
|
async def exists(self, key: str) -> bool:
|
|
"""Check if key exists in cache."""
|
|
if not is_cache_available():
|
|
return False
|
|
|
|
try:
|
|
redis_key = self._make_key(key)
|
|
result = await _redis_client.exists(redis_key)
|
|
return bool(result)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Cache exists error for key '{key}': {e}")
|
|
return False
|
|
|
|
async def expire(self, key: str, ttl: int) -> bool:
|
|
"""Set expiration time for key."""
|
|
if not is_cache_available():
|
|
return False
|
|
|
|
try:
|
|
redis_key = self._make_key(key)
|
|
result = await _redis_client.expire(redis_key, ttl)
|
|
return bool(result)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Cache expire error for key '{key}': {e}")
|
|
return False
|
|
|
|
async def clear_pattern(self, pattern: str) -> int:
|
|
"""Clear all keys matching pattern."""
|
|
if not is_cache_available():
|
|
return 0
|
|
|
|
try:
|
|
redis_pattern = self._make_key(pattern)
|
|
keys = await _redis_client.keys(redis_pattern)
|
|
|
|
if keys:
|
|
result = await _redis_client.delete(*keys)
|
|
return result
|
|
return 0
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Cache clear pattern error for pattern '{pattern}': {e}")
|
|
return 0
|
|
|
|
async def increment(self, key: str, amount: int = 1, ttl: Optional[int] = None) -> Optional[int]:
|
|
"""Increment counter in cache."""
|
|
if not is_cache_available():
|
|
return None
|
|
|
|
try:
|
|
redis_key = self._make_key(key)
|
|
result = await _redis_client.incrby(redis_key, amount)
|
|
|
|
if ttl is not None:
|
|
await _redis_client.expire(redis_key, ttl)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Cache increment error for key '{key}': {e}")
|
|
return None
|
|
|
|
async def get_multiple(self, keys: List[str]) -> Dict[str, Any]:
|
|
"""Get multiple values from cache."""
|
|
if not is_cache_available():
|
|
return {}
|
|
|
|
try:
|
|
redis_keys = [self._make_key(key) for key in keys]
|
|
values = await _redis_client.mget(redis_keys)
|
|
|
|
result = {}
|
|
for i, (key, data) in enumerate(zip(keys, values)):
|
|
if data is not None:
|
|
try:
|
|
result[key] = self._deserialize(data)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to deserialize cached value for key '{key}': {e}")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Cache get_multiple error: {e}")
|
|
return {}
|
|
|
|
async def set_multiple(self, mapping: Dict[str, Any], ttl: Optional[int] = None) -> bool:
|
|
"""Set multiple values in cache."""
|
|
if not is_cache_available():
|
|
return False
|
|
|
|
try:
|
|
pipeline = _redis_client.pipeline()
|
|
|
|
for key, value in mapping.items():
|
|
redis_key = self._make_key(key)
|
|
data = self._serialize(value)
|
|
|
|
if ttl is None:
|
|
ttl = self.settings.cache_default_ttl
|
|
|
|
pipeline.setex(redis_key, ttl, data)
|
|
|
|
await pipeline.execute()
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Cache set_multiple error: {e}")
|
|
return False
|
|
|
|
|
|
# Global cache instance
|
|
cache = Cache()
|
|
|
|
|
|
# Caching decorators
|
|
def cached(ttl: Optional[int] = None, key_prefix: str = "func"):
|
|
"""Decorator for caching function results."""
|
|
def decorator(func):
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
if not is_cache_available():
|
|
return await func(*args, **kwargs)
|
|
|
|
# Create cache key from function name and arguments
|
|
key_parts = [key_prefix, func.__name__]
|
|
if args:
|
|
key_parts.extend([str(arg) for arg in args])
|
|
if kwargs:
|
|
key_parts.extend([f"{k}={v}" for k, v in sorted(kwargs.items())])
|
|
|
|
cache_key = ":".join(key_parts)
|
|
|
|
# Try to get from cache
|
|
result = await cache.get(cache_key)
|
|
if result is not None:
|
|
return result
|
|
|
|
# Call function and cache result
|
|
result = await func(*args, **kwargs)
|
|
await cache.set(cache_key, result, ttl)
|
|
return result
|
|
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def cache_user_data(ttl: Optional[int] = None):
|
|
"""Decorator for caching user-specific data."""
|
|
if ttl is None:
|
|
ttl = get_settings().cache_user_ttl
|
|
return cached(ttl=ttl, key_prefix="user")
|
|
|
|
|
|
def cache_content_data(ttl: Optional[int] = None):
|
|
"""Decorator for caching content data."""
|
|
if ttl is None:
|
|
ttl = get_settings().cache_content_ttl
|
|
return cached(ttl=ttl, key_prefix="content")
|
|
|
|
|
|
# Cache health check
|
|
async def check_cache_health() -> Dict[str, Any]:
|
|
"""Check cache health and return status."""
|
|
if not is_cache_available():
|
|
return {
|
|
"status": "disabled",
|
|
"available": False,
|
|
"error": "Redis not initialized"
|
|
}
|
|
|
|
try:
|
|
# Test basic operations
|
|
test_key = "health_check"
|
|
test_value = {"timestamp": "test"}
|
|
|
|
await cache.set(test_key, test_value, 10)
|
|
retrieved = await cache.get(test_key)
|
|
await cache.delete(test_key)
|
|
|
|
# Get Redis info
|
|
info = await _redis_client.info()
|
|
|
|
return {
|
|
"status": "healthy",
|
|
"available": True,
|
|
"test_passed": retrieved == test_value,
|
|
"connected_clients": info.get("connected_clients", 0),
|
|
"used_memory": info.get("used_memory_human", "unknown"),
|
|
"total_commands_processed": info.get("total_commands_processed", 0),
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"status": "error",
|
|
"available": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
|
|
# Context manager for cache operations
|
|
@asynccontextmanager
|
|
async def cache_context():
|
|
"""Context manager for cache operations."""
|
|
try:
|
|
yield cache
|
|
except Exception as e:
|
|
logger.error(f"Cache context error: {e}")
|
|
raise |