""" Structured logging configuration with monitoring and observability """ import asyncio import logging import sys import time from datetime import datetime from pathlib import Path from typing import Any, Dict, Optional, Union from contextvars import ContextVar import json import structlog from structlog.stdlib import LoggerFactory from structlog.typing import EventDict, Processor import structlog.dev from app.core.config import settings, LOG_DIR, LOG_LEVEL # Context variables for request tracking request_id_var: ContextVar[Optional[str]] = ContextVar('request_id', default=None) user_id_var: ContextVar[Optional[int]] = ContextVar('user_id', default=None) operation_var: ContextVar[Optional[str]] = ContextVar('operation', default=None) class RequestContextProcessor: """Add request context to log records""" def __call__(self, logger, method_name, event_dict: EventDict) -> EventDict: """Add context variables to event dict""" if request_id := request_id_var.get(None): event_dict['request_id'] = request_id if user_id := user_id_var.get(None): event_dict['user_id'] = user_id if operation := operation_var.get(None): event_dict['operation'] = operation return event_dict class TimestampProcessor: """Add consistent timestamp to log records""" def __call__(self, logger, method_name, event_dict: EventDict) -> EventDict: """Add timestamp to event dict""" event_dict['timestamp'] = datetime.utcnow().isoformat() + 'Z' return event_dict class SecurityProcessor: """Filter sensitive data from logs""" SENSITIVE_KEYS = { 'password', 'token', 'key', 'secret', 'auth', 'credential', 'private_key', 'seed', 'mnemonic', 'api_key', 'authorization' } def __call__(self, logger, method_name, event_dict: EventDict) -> EventDict: """Remove or mask sensitive data""" return self._filter_dict(event_dict) def _filter_dict(self, data: Dict[str, Any]) -> Dict[str, Any]: """Recursively filter sensitive data""" if not isinstance(data, dict): return data filtered = {} for key, value in data.items(): if any(sensitive in key.lower() for sensitive in self.SENSITIVE_KEYS): filtered[key] = '***REDACTED***' elif isinstance(value, dict): filtered[key] = self._filter_dict(value) elif isinstance(value, list): filtered[key] = [ self._filter_dict(item) if isinstance(item, dict) else item for item in value ] else: filtered[key] = value return filtered class PerformanceProcessor: """Add performance metrics to log records""" def __call__(self, logger, method_name, event_dict: EventDict) -> EventDict: """Add performance data to event dict""" # Add memory usage if available try: import psutil process = psutil.Process() event_dict['memory_mb'] = round(process.memory_info().rss / 1024 / 1024, 2) event_dict['cpu_percent'] = process.cpu_percent() except ImportError: pass return event_dict class MetricsCollector: """Collect metrics from log events""" def __init__(self): self.counters: Dict[str, int] = {} self.timers: Dict[str, float] = {} self.errors: Dict[str, int] = {} def increment_counter(self, metric: str, value: int = 1): """Increment counter metric""" self.counters[metric] = self.counters.get(metric, 0) + value def record_timer(self, metric: str, duration: float): """Record timer metric""" self.timers[metric] = duration def record_error(self, error_type: str): """Record error metric""" self.errors[error_type] = self.errors.get(error_type, 0) + 1 def get_metrics(self) -> Dict[str, Any]: """Get all collected metrics""" return { 'counters': self.counters, 'timers': self.timers, 'errors': self.errors } # Global metrics collector metrics_collector = MetricsCollector() class DatabaseLogHandler(logging.Handler): """Log handler that stores critical logs in database""" def __init__(self): super().__init__() self.setLevel(logging.ERROR) self._queue = asyncio.Queue(maxsize=1000) self._task = None def emit(self, record: logging.LogRecord): """Add log record to queue""" try: log_entry = { 'timestamp': datetime.utcnow(), 'level': record.levelname, 'logger': record.name, 'message': record.getMessage(), 'module': record.module, 'function': record.funcName, 'line': record.lineno, 'request_id': getattr(record, 'request_id', None), 'user_id': getattr(record, 'user_id', None), 'extra': getattr(record, '__dict__', {}) } if not self._queue.full(): self._queue.put_nowait(log_entry) except Exception: # Don't let logging errors break the application pass async def process_logs(self): """Process logs from queue and store in database""" from app.core.database import get_db_session while True: try: log_entry = await self._queue.get() # Store in database (implement based on your log model) # async with get_db_session() as session: # log_record = LogRecord(**log_entry) # session.add(log_record) # await session.commit() except Exception as e: # Log to stderr to avoid infinite recursion print(f"Database log handler error: {e}", file=sys.stderr) await asyncio.sleep(0.1) def configure_logging(): """Configure structured logging""" # Configure standard library logging logging.basicConfig( format="%(message)s", stream=sys.stdout, level=getattr(logging, LOG_LEVEL.upper()) ) # Silence noisy loggers logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING) logging.getLogger("aioredis").setLevel(logging.WARNING) logging.getLogger("aiogram").setLevel(logging.WARNING) # Configure processors based on environment processors: list[Processor] = [ structlog.contextvars.merge_contextvars, RequestContextProcessor(), TimestampProcessor(), SecurityProcessor(), structlog.processors.add_log_level, structlog.processors.StackInfoRenderer(), ] if settings.DEBUG: processors.extend([ PerformanceProcessor(), structlog.dev.ConsoleRenderer(colors=True) ]) else: processors.append(structlog.processors.JSONRenderer()) # Configure structlog structlog.configure( processors=processors, wrapper_class=structlog.make_filtering_bound_logger( getattr(logging, LOG_LEVEL.upper()) ), logger_factory=LoggerFactory(), cache_logger_on_first_use=True, ) # Add file handler for persistent logging if not settings.DEBUG: log_file = LOG_DIR / f"app_{datetime.now().strftime('%Y%m%d')}.log" file_handler = logging.FileHandler(log_file, encoding='utf-8') file_handler.setFormatter( logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) ) logging.getLogger().addHandler(file_handler) class LoggerMixin: """Mixin to add structured logging to classes""" @property def logger(self): """Get logger for this class""" return structlog.get_logger(self.__class__.__name__) class AsyncContextLogger: """Context manager for async operations with automatic logging""" def __init__( self, operation: str, logger: Optional[structlog.BoundLogger] = None, log_args: bool = True, log_result: bool = True ): self.operation = operation self.logger = logger or structlog.get_logger() self.log_args = log_args self.log_result = log_result self.start_time = None async def __aenter__(self): """Enter async context""" self.start_time = time.time() operation_var.set(self.operation) self.logger.info( "Operation started", operation=self.operation, ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Exit async context with performance logging""" duration = time.time() - self.start_time if exc_type: self.logger.error( "Operation failed", operation=self.operation, duration_ms=round(duration * 1000, 2), error_type=exc_type.__name__, error_message=str(exc_val) ) metrics_collector.record_error(f"{self.operation}_error") else: self.logger.info( "Operation completed", operation=self.operation, duration_ms=round(duration * 1000, 2) ) metrics_collector.record_timer(f"{self.operation}_duration", duration) operation_var.set(None) def get_logger(name: str = None) -> structlog.BoundLogger: """Get configured structured logger""" return structlog.get_logger(name) # Compatibility wrapper for old logging def make_log( component: Optional[str], message: str, level: str = 'info', **kwargs ): """Legacy logging function for backward compatibility""" logger = get_logger(component or 'Legacy') log_func = getattr(logger, level.lower(), logger.info) log_func(message, **kwargs) # Performance monitoring decorator def log_performance(operation: str = None): """Decorator to log function performance""" def decorator(func): async def async_wrapper(*args, **kwargs): op_name = operation or f"{func.__module__}.{func.__name__}" async with AsyncContextLogger(op_name): return await func(*args, **kwargs) def sync_wrapper(*args, **kwargs): op_name = operation or f"{func.__module__}.{func.__name__}" start_time = time.time() logger = get_logger(func.__module__) try: logger.info("Function started", function=op_name) result = func(*args, **kwargs) duration = time.time() - start_time logger.info( "Function completed", function=op_name, duration_ms=round(duration * 1000, 2) ) return result except Exception as e: duration = time.time() - start_time logger.error( "Function failed", function=op_name, duration_ms=round(duration * 1000, 2), error=str(e) ) raise return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper return decorator # Initialize logging configure_logging()