"""Prometheus metrics collection for my-uploader-bot.""" import logging import time from datetime import datetime from functools import wraps from typing import Dict, Any, Optional, Callable from prometheus_client import Counter, Histogram, Gauge, Info, generate_latest, CONTENT_TYPE_LATEST from fastapi import Request from fastapi.responses import Response logger = logging.getLogger(__name__) # Application info APP_INFO = Info('myuploader_app_info', 'Application information') APP_INFO.info({ 'version': '2.0.0', 'name': 'my-uploader-bot', 'python_version': '3.11+' }) # HTTP request metrics HTTP_REQUESTS_TOTAL = Counter( 'http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status_code'] ) HTTP_REQUEST_DURATION = Histogram( 'http_request_duration_seconds', 'HTTP request duration in seconds', ['method', 'endpoint'] ) HTTP_REQUEST_SIZE = Histogram( 'http_request_size_bytes', 'HTTP request size in bytes', ['method', 'endpoint'] ) HTTP_RESPONSE_SIZE = Histogram( 'http_response_size_bytes', 'HTTP response size in bytes', ['method', 'endpoint'] ) # Authentication metrics AUTH_LOGIN_ATTEMPTS_TOTAL = Counter( 'auth_login_attempts_total', 'Total login attempts', ['status'] ) AUTH_LOGIN_FAILURES_TOTAL = Counter( 'auth_login_failures_total', 'Total login failures', ['reason'] ) AUTH_API_KEY_USAGE_TOTAL = Counter( 'auth_api_key_usage_total', 'Total API key usage', ['key_id', 'status'] ) # File upload metrics UPLOAD_REQUESTS_TOTAL = Counter( 'upload_requests_total', 'Total upload requests', ['status', 'file_type'] ) UPLOAD_SIZE_BYTES = Histogram( 'upload_size_bytes', 'File upload size in bytes', ['file_type'] ) UPLOAD_DURATION_SECONDS = Histogram( 'upload_duration_seconds', 'File upload duration in seconds', ['file_type'] ) UPLOAD_QUEUE_SIZE = Gauge( 'upload_queue_size', 'Number of files in upload queue' ) UPLOAD_FAILURES_TOTAL = Counter( 'upload_failures_total', 'Total upload failures', ['reason', 'file_type'] ) # File processing metrics PROCESSING_QUEUE_SIZE = Gauge( 'processing_queue_size', 'Number of files in processing queue' ) PROCESSING_DURATION_SECONDS = Histogram( 'processing_duration_seconds', 'File processing duration in seconds', ['file_type', 'operation'] ) PROCESSING_FAILURES_TOTAL = Counter( 'processing_failures_total', 'Total processing failures', ['file_type', 'operation'] ) # Database metrics DB_CONNECTIONS_ACTIVE = Gauge( 'db_connections_active', 'Number of active database connections' ) DB_CONNECTIONS_IDLE = Gauge( 'db_connections_idle', 'Number of idle database connections' ) DB_QUERY_DURATION_SECONDS = Histogram( 'db_query_duration_seconds', 'Database query duration in seconds', ['operation'] ) DB_TRANSACTIONS_TOTAL = Counter( 'db_transactions_total', 'Total database transactions', ['status'] ) # Cache metrics CACHE_OPERATIONS_TOTAL = Counter( 'cache_operations_total', 'Total cache operations', ['operation', 'status'] ) CACHE_HIT_RATIO = Gauge( 'cache_hit_ratio', 'Cache hit ratio' ) CACHE_KEYS_TOTAL = Gauge( 'cache_keys_total', 'Total number of cache keys' ) CACHE_MEMORY_USAGE_BYTES = Gauge( 'cache_memory_usage_bytes', 'Cache memory usage in bytes' ) # Storage metrics STORAGE_OPERATIONS_TOTAL = Counter( 'storage_operations_total', 'Total storage operations', ['operation', 'backend', 'status'] ) STORAGE_AVAILABLE_BYTES = Gauge( 'storage_available_bytes', 'Available storage space in bytes', ['backend'] ) STORAGE_TOTAL_BYTES = Gauge( 'storage_total_bytes', 'Total storage space in bytes', ['backend'] ) STORAGE_FILES_TOTAL = Gauge( 'storage_files_total', 'Total number of stored files', ['backend'] ) # Blockchain metrics BLOCKCHAIN_TRANSACTIONS_TOTAL = Counter( 'blockchain_transactions_total', 'Total blockchain transactions', ['status', 'network'] ) BLOCKCHAIN_TRANSACTION_FEES = Histogram( 'blockchain_transaction_fees', 'Blockchain transaction fees', ['network'] ) BLOCKCHAIN_PENDING_TRANSACTIONS = Gauge( 'blockchain_pending_transactions', 'Number of pending blockchain transactions' ) BLOCKCHAIN_WALLET_BALANCES = Gauge( 'blockchain_wallet_balances', 'Wallet balances', ['wallet_id', 'currency'] ) TON_SERVICE_UP = Gauge( 'ton_service_up', 'TON service availability (1 = up, 0 = down)' ) # Security metrics RATE_LIMIT_HITS_TOTAL = Counter( 'rate_limit_hits_total', 'Total rate limit hits', ['endpoint', 'user_id'] ) SECURITY_EVENTS_TOTAL = Counter( 'security_events_total', 'Total security events', ['event_type', 'severity'] ) SECURITY_SUSPICIOUS_EVENTS = Gauge( 'security_suspicious_events', 'Number of suspicious security events in the last hour' ) FAILED_LOGIN_ATTEMPTS = Counter( 'failed_login_attempts_total', 'Total failed login attempts', ['ip_address', 'reason'] ) # System metrics SYSTEM_UPTIME_SECONDS = Gauge( 'system_uptime_seconds', 'System uptime in seconds' ) BACKGROUND_TASKS_ACTIVE = Gauge( 'background_tasks_active', 'Number of active background tasks', ['service'] ) BACKGROUND_TASKS_COMPLETED = Counter( 'background_tasks_completed_total', 'Total completed background tasks', ['service', 'status'] ) # Error metrics ERROR_RATE = Gauge( 'error_rate', 'Application error rate' ) EXCEPTIONS_TOTAL = Counter( 'exceptions_total', 'Total exceptions', ['exception_type', 'handler'] ) class MetricsCollector: """Centralized metrics collection and management.""" def __init__(self): self.start_time = time.time() self._cache_stats = { 'hits': 0, 'misses': 0, 'operations': 0 } def record_http_request( self, method: str, endpoint: str, status_code: int, duration: float, request_size: int = 0, response_size: int = 0 ): """Record HTTP request metrics.""" HTTP_REQUESTS_TOTAL.labels( method=method, endpoint=endpoint, status_code=status_code ).inc() HTTP_REQUEST_DURATION.labels( method=method, endpoint=endpoint ).observe(duration) if request_size > 0: HTTP_REQUEST_SIZE.labels( method=method, endpoint=endpoint ).observe(request_size) if response_size > 0: HTTP_RESPONSE_SIZE.labels( method=method, endpoint=endpoint ).observe(response_size) def record_auth_event(self, event_type: str, status: str, **labels): """Record authentication events.""" if event_type == 'login': AUTH_LOGIN_ATTEMPTS_TOTAL.labels(status=status).inc() if status == 'failed': reason = labels.get('reason', 'unknown') AUTH_LOGIN_FAILURES_TOTAL.labels(reason=reason).inc() elif event_type == 'api_key': key_id = labels.get('key_id', 'unknown') AUTH_API_KEY_USAGE_TOTAL.labels(key_id=key_id, status=status).inc() def record_upload_event( self, status: str, file_type: str, file_size: int = 0, duration: float = 0, **kwargs ): """Record file upload events.""" UPLOAD_REQUESTS_TOTAL.labels(status=status, file_type=file_type).inc() if file_size > 0: UPLOAD_SIZE_BYTES.labels(file_type=file_type).observe(file_size) if duration > 0: UPLOAD_DURATION_SECONDS.labels(file_type=file_type).observe(duration) if status == 'failed': reason = kwargs.get('reason', 'unknown') UPLOAD_FAILURES_TOTAL.labels(reason=reason, file_type=file_type).inc() def record_processing_event( self, file_type: str, operation: str, duration: float = 0, status: str = 'success' ): """Record file processing events.""" if duration > 0: PROCESSING_DURATION_SECONDS.labels( file_type=file_type, operation=operation ).observe(duration) if status == 'failed': PROCESSING_FAILURES_TOTAL.labels( file_type=file_type, operation=operation ).inc() def record_db_event(self, operation: str, duration: float = 0, status: str = 'success'): """Record database events.""" if duration > 0: DB_QUERY_DURATION_SECONDS.labels(operation=operation).observe(duration) DB_TRANSACTIONS_TOTAL.labels(status=status).inc() def record_cache_event(self, operation: str, status: str): """Record cache events.""" CACHE_OPERATIONS_TOTAL.labels(operation=operation, status=status).inc() # Update cache stats self._cache_stats['operations'] += 1 if status == 'hit': self._cache_stats['hits'] += 1 elif status == 'miss': self._cache_stats['misses'] += 1 # Update hit ratio if self._cache_stats['operations'] > 0: hit_ratio = self._cache_stats['hits'] / self._cache_stats['operations'] CACHE_HIT_RATIO.set(hit_ratio) def record_blockchain_event( self, event_type: str, status: str, network: str = 'mainnet', **kwargs ): """Record blockchain events.""" if event_type == 'transaction': BLOCKCHAIN_TRANSACTIONS_TOTAL.labels(status=status, network=network).inc() if 'fee' in kwargs: BLOCKCHAIN_TRANSACTION_FEES.labels(network=network).observe(kwargs['fee']) def record_security_event(self, event_type: str, severity: str = 'info', **kwargs): """Record security events.""" SECURITY_EVENTS_TOTAL.labels(event_type=event_type, severity=severity).inc() if event_type == 'rate_limit': endpoint = kwargs.get('endpoint', 'unknown') user_id = kwargs.get('user_id', 'anonymous') RATE_LIMIT_HITS_TOTAL.labels(endpoint=endpoint, user_id=user_id).inc() elif event_type == 'failed_login': ip_address = kwargs.get('ip_address', 'unknown') reason = kwargs.get('reason', 'unknown') FAILED_LOGIN_ATTEMPTS.labels(ip_address=ip_address, reason=reason).inc() def update_system_metrics(self): """Update system-level metrics.""" uptime = time.time() - self.start_time SYSTEM_UPTIME_SECONDS.set(uptime) def update_gauge_metrics(self, metrics_data: Dict[str, Any]): """Update gauge metrics from external data.""" # Database metrics if 'db_connections' in metrics_data: db_conn = metrics_data['db_connections'] DB_CONNECTIONS_ACTIVE.set(db_conn.get('active', 0)) DB_CONNECTIONS_IDLE.set(db_conn.get('idle', 0)) # Cache metrics if 'cache' in metrics_data: cache_data = metrics_data['cache'] CACHE_KEYS_TOTAL.set(cache_data.get('keys', 0)) CACHE_MEMORY_USAGE_BYTES.set(cache_data.get('memory_usage', 0)) # Storage metrics if 'storage' in metrics_data: storage_data = metrics_data['storage'] for backend, data in storage_data.items(): STORAGE_AVAILABLE_BYTES.labels(backend=backend).set(data.get('available', 0)) STORAGE_TOTAL_BYTES.labels(backend=backend).set(data.get('total', 0)) STORAGE_FILES_TOTAL.labels(backend=backend).set(data.get('files', 0)) # Queue metrics if 'queues' in metrics_data: queues = metrics_data['queues'] UPLOAD_QUEUE_SIZE.set(queues.get('upload', 0)) PROCESSING_QUEUE_SIZE.set(queues.get('processing', 0)) # Blockchain metrics if 'blockchain' in metrics_data: blockchain_data = metrics_data['blockchain'] BLOCKCHAIN_PENDING_TRANSACTIONS.set(blockchain_data.get('pending_transactions', 0)) TON_SERVICE_UP.set(1 if blockchain_data.get('ton_service_up') else 0) # Wallet balances for wallet_id, balance_data in blockchain_data.get('wallet_balances', {}).items(): for currency, balance in balance_data.items(): BLOCKCHAIN_WALLET_BALANCES.labels( wallet_id=wallet_id, currency=currency ).set(balance) # Background tasks if 'background_tasks' in metrics_data: tasks_data = metrics_data['background_tasks'] for service, count in tasks_data.items(): BACKGROUND_TASKS_ACTIVE.labels(service=service).set(count) # Global metrics collector instance metrics_collector = MetricsCollector() async def metrics_middleware(request: Request, call_next): """FastAPI middleware to collect HTTP metrics.""" start_time = time.time() # Process request response = await call_next(request) # Calculate duration duration = time.time() - start_time # Get endpoint info endpoint = str(request.url.path) method = request.method status_code = response.status_code # Get request/response sizes (FastAPI doesn't expose body directly in middleware) request_size = 0 if hasattr(request, '_body'): request_size = len(request._body) response_size = 0 if hasattr(response, 'body'): response_size = len(response.body) if response.body else 0 # Record metrics metrics_collector.record_http_request( method=method, endpoint=endpoint, status_code=status_code, duration=duration, request_size=request_size, response_size=response_size ) return response def track_function_calls(func_name: str, labels: Optional[Dict[str, str]] = None): """Decorator to track function call metrics.""" def decorator(func: Callable) -> Callable: @wraps(func) async def async_wrapper(*args, **kwargs): start_time = time.time() status = 'success' try: result = await func(*args, **kwargs) return result except Exception as e: status = 'error' EXCEPTIONS_TOTAL.labels( exception_type=type(e).__name__, handler=func_name ).inc() raise finally: duration = time.time() - start_time # Record custom metrics based on function type if func_name.startswith('db_'): metrics_collector.record_db_event(func_name, duration, status) elif func_name.startswith('cache_'): metrics_collector.record_cache_event(func_name, status) @wraps(func) def sync_wrapper(*args, **kwargs): start_time = time.time() status = 'success' try: result = func(*args, **kwargs) return result except Exception as e: status = 'error' EXCEPTIONS_TOTAL.labels( exception_type=type(e).__name__, handler=func_name ).inc() raise finally: duration = time.time() - start_time # Record custom metrics based on function type if func_name.startswith('db_'): metrics_collector.record_db_event(func_name, duration, status) elif func_name.startswith('cache_'): metrics_collector.record_cache_event(func_name, status) return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper return decorator async def get_metrics(): """Get Prometheus metrics.""" # Update system metrics before generating output metrics_collector.update_system_metrics() # Generate metrics in Prometheus format return generate_latest() def get_metrics_content_type(): """Get the content type for metrics.""" return CONTENT_TYPE_LATEST