uploader-bot/app/core/metrics.py

566 lines
16 KiB
Python

"""Prometheus metrics collection for my-uploader-bot."""
import logging
import time
from datetime import datetime
from functools import wraps
from typing import Dict, Any, Optional, Callable
from prometheus_client import Counter, Histogram, Gauge, Info, generate_latest, CONTENT_TYPE_LATEST
from sanic import Request, Response
logger = logging.getLogger(__name__)
# Application info
APP_INFO = Info('myuploader_app_info', 'Application information')
APP_INFO.info({
'version': '2.0.0',
'name': 'my-uploader-bot',
'python_version': '3.11+'
})
# HTTP request metrics
HTTP_REQUESTS_TOTAL = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status_code']
)
HTTP_REQUEST_DURATION = Histogram(
'http_request_duration_seconds',
'HTTP request duration in seconds',
['method', 'endpoint']
)
HTTP_REQUEST_SIZE = Histogram(
'http_request_size_bytes',
'HTTP request size in bytes',
['method', 'endpoint']
)
HTTP_RESPONSE_SIZE = Histogram(
'http_response_size_bytes',
'HTTP response size in bytes',
['method', 'endpoint']
)
# Authentication metrics
AUTH_LOGIN_ATTEMPTS_TOTAL = Counter(
'auth_login_attempts_total',
'Total login attempts',
['status']
)
AUTH_LOGIN_FAILURES_TOTAL = Counter(
'auth_login_failures_total',
'Total login failures',
['reason']
)
AUTH_API_KEY_USAGE_TOTAL = Counter(
'auth_api_key_usage_total',
'Total API key usage',
['key_id', 'status']
)
# File upload metrics
UPLOAD_REQUESTS_TOTAL = Counter(
'upload_requests_total',
'Total upload requests',
['status', 'file_type']
)
UPLOAD_SIZE_BYTES = Histogram(
'upload_size_bytes',
'File upload size in bytes',
['file_type']
)
UPLOAD_DURATION_SECONDS = Histogram(
'upload_duration_seconds',
'File upload duration in seconds',
['file_type']
)
UPLOAD_QUEUE_SIZE = Gauge(
'upload_queue_size',
'Number of files in upload queue'
)
UPLOAD_FAILURES_TOTAL = Counter(
'upload_failures_total',
'Total upload failures',
['reason', 'file_type']
)
# File processing metrics
PROCESSING_QUEUE_SIZE = Gauge(
'processing_queue_size',
'Number of files in processing queue'
)
PROCESSING_DURATION_SECONDS = Histogram(
'processing_duration_seconds',
'File processing duration in seconds',
['file_type', 'operation']
)
PROCESSING_FAILURES_TOTAL = Counter(
'processing_failures_total',
'Total processing failures',
['file_type', 'operation']
)
# Database metrics
DB_CONNECTIONS_ACTIVE = Gauge(
'db_connections_active',
'Number of active database connections'
)
DB_CONNECTIONS_IDLE = Gauge(
'db_connections_idle',
'Number of idle database connections'
)
DB_QUERY_DURATION_SECONDS = Histogram(
'db_query_duration_seconds',
'Database query duration in seconds',
['operation']
)
DB_TRANSACTIONS_TOTAL = Counter(
'db_transactions_total',
'Total database transactions',
['status']
)
# Cache metrics
CACHE_OPERATIONS_TOTAL = Counter(
'cache_operations_total',
'Total cache operations',
['operation', 'status']
)
CACHE_HIT_RATIO = Gauge(
'cache_hit_ratio',
'Cache hit ratio'
)
CACHE_KEYS_TOTAL = Gauge(
'cache_keys_total',
'Total number of cache keys'
)
CACHE_MEMORY_USAGE_BYTES = Gauge(
'cache_memory_usage_bytes',
'Cache memory usage in bytes'
)
# Storage metrics
STORAGE_OPERATIONS_TOTAL = Counter(
'storage_operations_total',
'Total storage operations',
['operation', 'backend', 'status']
)
STORAGE_AVAILABLE_BYTES = Gauge(
'storage_available_bytes',
'Available storage space in bytes',
['backend']
)
STORAGE_TOTAL_BYTES = Gauge(
'storage_total_bytes',
'Total storage space in bytes',
['backend']
)
STORAGE_FILES_TOTAL = Gauge(
'storage_files_total',
'Total number of stored files',
['backend']
)
# Blockchain metrics
BLOCKCHAIN_TRANSACTIONS_TOTAL = Counter(
'blockchain_transactions_total',
'Total blockchain transactions',
['status', 'network']
)
BLOCKCHAIN_TRANSACTION_FEES = Histogram(
'blockchain_transaction_fees',
'Blockchain transaction fees',
['network']
)
BLOCKCHAIN_PENDING_TRANSACTIONS = Gauge(
'blockchain_pending_transactions',
'Number of pending blockchain transactions'
)
BLOCKCHAIN_WALLET_BALANCES = Gauge(
'blockchain_wallet_balances',
'Wallet balances',
['wallet_id', 'currency']
)
TON_SERVICE_UP = Gauge(
'ton_service_up',
'TON service availability (1 = up, 0 = down)'
)
# Security metrics
RATE_LIMIT_HITS_TOTAL = Counter(
'rate_limit_hits_total',
'Total rate limit hits',
['endpoint', 'user_id']
)
SECURITY_EVENTS_TOTAL = Counter(
'security_events_total',
'Total security events',
['event_type', 'severity']
)
SECURITY_SUSPICIOUS_EVENTS = Gauge(
'security_suspicious_events',
'Number of suspicious security events in the last hour'
)
FAILED_LOGIN_ATTEMPTS = Counter(
'failed_login_attempts_total',
'Total failed login attempts',
['ip_address', 'reason']
)
# System metrics
SYSTEM_UPTIME_SECONDS = Gauge(
'system_uptime_seconds',
'System uptime in seconds'
)
BACKGROUND_TASKS_ACTIVE = Gauge(
'background_tasks_active',
'Number of active background tasks',
['service']
)
BACKGROUND_TASKS_COMPLETED = Counter(
'background_tasks_completed_total',
'Total completed background tasks',
['service', 'status']
)
# Error metrics
ERROR_RATE = Gauge(
'error_rate',
'Application error rate'
)
EXCEPTIONS_TOTAL = Counter(
'exceptions_total',
'Total exceptions',
['exception_type', 'handler']
)
class MetricsCollector:
"""Centralized metrics collection and management."""
def __init__(self):
self.start_time = time.time()
self._cache_stats = {
'hits': 0,
'misses': 0,
'operations': 0
}
def record_http_request(
self,
method: str,
endpoint: str,
status_code: int,
duration: float,
request_size: int = 0,
response_size: int = 0
):
"""Record HTTP request metrics."""
HTTP_REQUESTS_TOTAL.labels(
method=method,
endpoint=endpoint,
status_code=status_code
).inc()
HTTP_REQUEST_DURATION.labels(
method=method,
endpoint=endpoint
).observe(duration)
if request_size > 0:
HTTP_REQUEST_SIZE.labels(
method=method,
endpoint=endpoint
).observe(request_size)
if response_size > 0:
HTTP_RESPONSE_SIZE.labels(
method=method,
endpoint=endpoint
).observe(response_size)
def record_auth_event(self, event_type: str, status: str, **labels):
"""Record authentication events."""
if event_type == 'login':
AUTH_LOGIN_ATTEMPTS_TOTAL.labels(status=status).inc()
if status == 'failed':
reason = labels.get('reason', 'unknown')
AUTH_LOGIN_FAILURES_TOTAL.labels(reason=reason).inc()
elif event_type == 'api_key':
key_id = labels.get('key_id', 'unknown')
AUTH_API_KEY_USAGE_TOTAL.labels(key_id=key_id, status=status).inc()
def record_upload_event(
self,
status: str,
file_type: str,
file_size: int = 0,
duration: float = 0,
**kwargs
):
"""Record file upload events."""
UPLOAD_REQUESTS_TOTAL.labels(status=status, file_type=file_type).inc()
if file_size > 0:
UPLOAD_SIZE_BYTES.labels(file_type=file_type).observe(file_size)
if duration > 0:
UPLOAD_DURATION_SECONDS.labels(file_type=file_type).observe(duration)
if status == 'failed':
reason = kwargs.get('reason', 'unknown')
UPLOAD_FAILURES_TOTAL.labels(reason=reason, file_type=file_type).inc()
def record_processing_event(
self,
file_type: str,
operation: str,
duration: float = 0,
status: str = 'success'
):
"""Record file processing events."""
if duration > 0:
PROCESSING_DURATION_SECONDS.labels(
file_type=file_type,
operation=operation
).observe(duration)
if status == 'failed':
PROCESSING_FAILURES_TOTAL.labels(
file_type=file_type,
operation=operation
).inc()
def record_db_event(self, operation: str, duration: float = 0, status: str = 'success'):
"""Record database events."""
if duration > 0:
DB_QUERY_DURATION_SECONDS.labels(operation=operation).observe(duration)
DB_TRANSACTIONS_TOTAL.labels(status=status).inc()
def record_cache_event(self, operation: str, status: str):
"""Record cache events."""
CACHE_OPERATIONS_TOTAL.labels(operation=operation, status=status).inc()
# Update cache stats
self._cache_stats['operations'] += 1
if status == 'hit':
self._cache_stats['hits'] += 1
elif status == 'miss':
self._cache_stats['misses'] += 1
# Update hit ratio
if self._cache_stats['operations'] > 0:
hit_ratio = self._cache_stats['hits'] / self._cache_stats['operations']
CACHE_HIT_RATIO.set(hit_ratio)
def record_blockchain_event(
self,
event_type: str,
status: str,
network: str = 'mainnet',
**kwargs
):
"""Record blockchain events."""
if event_type == 'transaction':
BLOCKCHAIN_TRANSACTIONS_TOTAL.labels(status=status, network=network).inc()
if 'fee' in kwargs:
BLOCKCHAIN_TRANSACTION_FEES.labels(network=network).observe(kwargs['fee'])
def record_security_event(self, event_type: str, severity: str = 'info', **kwargs):
"""Record security events."""
SECURITY_EVENTS_TOTAL.labels(event_type=event_type, severity=severity).inc()
if event_type == 'rate_limit':
endpoint = kwargs.get('endpoint', 'unknown')
user_id = kwargs.get('user_id', 'anonymous')
RATE_LIMIT_HITS_TOTAL.labels(endpoint=endpoint, user_id=user_id).inc()
elif event_type == 'failed_login':
ip_address = kwargs.get('ip_address', 'unknown')
reason = kwargs.get('reason', 'unknown')
FAILED_LOGIN_ATTEMPTS.labels(ip_address=ip_address, reason=reason).inc()
def update_system_metrics(self):
"""Update system-level metrics."""
uptime = time.time() - self.start_time
SYSTEM_UPTIME_SECONDS.set(uptime)
def update_gauge_metrics(self, metrics_data: Dict[str, Any]):
"""Update gauge metrics from external data."""
# Database metrics
if 'db_connections' in metrics_data:
db_conn = metrics_data['db_connections']
DB_CONNECTIONS_ACTIVE.set(db_conn.get('active', 0))
DB_CONNECTIONS_IDLE.set(db_conn.get('idle', 0))
# Cache metrics
if 'cache' in metrics_data:
cache_data = metrics_data['cache']
CACHE_KEYS_TOTAL.set(cache_data.get('keys', 0))
CACHE_MEMORY_USAGE_BYTES.set(cache_data.get('memory_usage', 0))
# Storage metrics
if 'storage' in metrics_data:
storage_data = metrics_data['storage']
for backend, data in storage_data.items():
STORAGE_AVAILABLE_BYTES.labels(backend=backend).set(data.get('available', 0))
STORAGE_TOTAL_BYTES.labels(backend=backend).set(data.get('total', 0))
STORAGE_FILES_TOTAL.labels(backend=backend).set(data.get('files', 0))
# Queue metrics
if 'queues' in metrics_data:
queues = metrics_data['queues']
UPLOAD_QUEUE_SIZE.set(queues.get('upload', 0))
PROCESSING_QUEUE_SIZE.set(queues.get('processing', 0))
# Blockchain metrics
if 'blockchain' in metrics_data:
blockchain_data = metrics_data['blockchain']
BLOCKCHAIN_PENDING_TRANSACTIONS.set(blockchain_data.get('pending_transactions', 0))
TON_SERVICE_UP.set(1 if blockchain_data.get('ton_service_up') else 0)
# Wallet balances
for wallet_id, balance_data in blockchain_data.get('wallet_balances', {}).items():
for currency, balance in balance_data.items():
BLOCKCHAIN_WALLET_BALANCES.labels(
wallet_id=wallet_id,
currency=currency
).set(balance)
# Background tasks
if 'background_tasks' in metrics_data:
tasks_data = metrics_data['background_tasks']
for service, count in tasks_data.items():
BACKGROUND_TASKS_ACTIVE.labels(service=service).set(count)
# Global metrics collector instance
metrics_collector = MetricsCollector()
def metrics_middleware(request: Request, response: Response):
"""Middleware to collect HTTP metrics."""
start_time = time.time()
# After request processing
duration = time.time() - start_time
# Get endpoint info
endpoint = request.path
method = request.method
status_code = response.status
# Get request/response sizes
request_size = len(request.body) if request.body else 0
response_size = len(response.body) if hasattr(response, 'body') and response.body else 0
# Record metrics
metrics_collector.record_http_request(
method=method,
endpoint=endpoint,
status_code=status_code,
duration=duration,
request_size=request_size,
response_size=response_size
)
def track_function_calls(func_name: str, labels: Optional[Dict[str, str]] = None):
"""Decorator to track function call metrics."""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def async_wrapper(*args, **kwargs):
start_time = time.time()
status = 'success'
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
status = 'error'
EXCEPTIONS_TOTAL.labels(
exception_type=type(e).__name__,
handler=func_name
).inc()
raise
finally:
duration = time.time() - start_time
# Record custom metrics based on function type
if func_name.startswith('db_'):
metrics_collector.record_db_event(func_name, duration, status)
elif func_name.startswith('cache_'):
metrics_collector.record_cache_event(func_name, status)
@wraps(func)
def sync_wrapper(*args, **kwargs):
start_time = time.time()
status = 'success'
try:
result = func(*args, **kwargs)
return result
except Exception as e:
status = 'error'
EXCEPTIONS_TOTAL.labels(
exception_type=type(e).__name__,
handler=func_name
).inc()
raise
finally:
duration = time.time() - start_time
# Record custom metrics based on function type
if func_name.startswith('db_'):
metrics_collector.record_db_event(func_name, duration, status)
elif func_name.startswith('cache_'):
metrics_collector.record_cache_event(func_name, status)
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
async def get_metrics():
"""Get Prometheus metrics."""
# Update system metrics before generating output
metrics_collector.update_system_metrics()
# Generate metrics in Prometheus format
return generate_latest()
def get_metrics_content_type():
"""Get the content type for metrics."""
return CONTENT_TYPE_LATEST