566 lines
16 KiB
Python
566 lines
16 KiB
Python
"""Prometheus metrics collection for my-uploader-bot."""
|
|
|
|
import logging
|
|
import time
|
|
from datetime import datetime
|
|
from functools import wraps
|
|
from typing import Dict, Any, Optional, Callable
|
|
|
|
from prometheus_client import Counter, Histogram, Gauge, Info, generate_latest, CONTENT_TYPE_LATEST
|
|
from sanic import Request, Response
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# Application info
|
|
APP_INFO = Info('myuploader_app_info', 'Application information')
|
|
APP_INFO.info({
|
|
'version': '2.0.0',
|
|
'name': 'my-uploader-bot',
|
|
'python_version': '3.11+'
|
|
})
|
|
|
|
# HTTP request metrics
|
|
HTTP_REQUESTS_TOTAL = Counter(
|
|
'http_requests_total',
|
|
'Total HTTP requests',
|
|
['method', 'endpoint', 'status_code']
|
|
)
|
|
|
|
HTTP_REQUEST_DURATION = Histogram(
|
|
'http_request_duration_seconds',
|
|
'HTTP request duration in seconds',
|
|
['method', 'endpoint']
|
|
)
|
|
|
|
HTTP_REQUEST_SIZE = Histogram(
|
|
'http_request_size_bytes',
|
|
'HTTP request size in bytes',
|
|
['method', 'endpoint']
|
|
)
|
|
|
|
HTTP_RESPONSE_SIZE = Histogram(
|
|
'http_response_size_bytes',
|
|
'HTTP response size in bytes',
|
|
['method', 'endpoint']
|
|
)
|
|
|
|
# Authentication metrics
|
|
AUTH_LOGIN_ATTEMPTS_TOTAL = Counter(
|
|
'auth_login_attempts_total',
|
|
'Total login attempts',
|
|
['status']
|
|
)
|
|
|
|
AUTH_LOGIN_FAILURES_TOTAL = Counter(
|
|
'auth_login_failures_total',
|
|
'Total login failures',
|
|
['reason']
|
|
)
|
|
|
|
AUTH_API_KEY_USAGE_TOTAL = Counter(
|
|
'auth_api_key_usage_total',
|
|
'Total API key usage',
|
|
['key_id', 'status']
|
|
)
|
|
|
|
# File upload metrics
|
|
UPLOAD_REQUESTS_TOTAL = Counter(
|
|
'upload_requests_total',
|
|
'Total upload requests',
|
|
['status', 'file_type']
|
|
)
|
|
|
|
UPLOAD_SIZE_BYTES = Histogram(
|
|
'upload_size_bytes',
|
|
'File upload size in bytes',
|
|
['file_type']
|
|
)
|
|
|
|
UPLOAD_DURATION_SECONDS = Histogram(
|
|
'upload_duration_seconds',
|
|
'File upload duration in seconds',
|
|
['file_type']
|
|
)
|
|
|
|
UPLOAD_QUEUE_SIZE = Gauge(
|
|
'upload_queue_size',
|
|
'Number of files in upload queue'
|
|
)
|
|
|
|
UPLOAD_FAILURES_TOTAL = Counter(
|
|
'upload_failures_total',
|
|
'Total upload failures',
|
|
['reason', 'file_type']
|
|
)
|
|
|
|
# File processing metrics
|
|
PROCESSING_QUEUE_SIZE = Gauge(
|
|
'processing_queue_size',
|
|
'Number of files in processing queue'
|
|
)
|
|
|
|
PROCESSING_DURATION_SECONDS = Histogram(
|
|
'processing_duration_seconds',
|
|
'File processing duration in seconds',
|
|
['file_type', 'operation']
|
|
)
|
|
|
|
PROCESSING_FAILURES_TOTAL = Counter(
|
|
'processing_failures_total',
|
|
'Total processing failures',
|
|
['file_type', 'operation']
|
|
)
|
|
|
|
# Database metrics
|
|
DB_CONNECTIONS_ACTIVE = Gauge(
|
|
'db_connections_active',
|
|
'Number of active database connections'
|
|
)
|
|
|
|
DB_CONNECTIONS_IDLE = Gauge(
|
|
'db_connections_idle',
|
|
'Number of idle database connections'
|
|
)
|
|
|
|
DB_QUERY_DURATION_SECONDS = Histogram(
|
|
'db_query_duration_seconds',
|
|
'Database query duration in seconds',
|
|
['operation']
|
|
)
|
|
|
|
DB_TRANSACTIONS_TOTAL = Counter(
|
|
'db_transactions_total',
|
|
'Total database transactions',
|
|
['status']
|
|
)
|
|
|
|
# Cache metrics
|
|
CACHE_OPERATIONS_TOTAL = Counter(
|
|
'cache_operations_total',
|
|
'Total cache operations',
|
|
['operation', 'status']
|
|
)
|
|
|
|
CACHE_HIT_RATIO = Gauge(
|
|
'cache_hit_ratio',
|
|
'Cache hit ratio'
|
|
)
|
|
|
|
CACHE_KEYS_TOTAL = Gauge(
|
|
'cache_keys_total',
|
|
'Total number of cache keys'
|
|
)
|
|
|
|
CACHE_MEMORY_USAGE_BYTES = Gauge(
|
|
'cache_memory_usage_bytes',
|
|
'Cache memory usage in bytes'
|
|
)
|
|
|
|
# Storage metrics
|
|
STORAGE_OPERATIONS_TOTAL = Counter(
|
|
'storage_operations_total',
|
|
'Total storage operations',
|
|
['operation', 'backend', 'status']
|
|
)
|
|
|
|
STORAGE_AVAILABLE_BYTES = Gauge(
|
|
'storage_available_bytes',
|
|
'Available storage space in bytes',
|
|
['backend']
|
|
)
|
|
|
|
STORAGE_TOTAL_BYTES = Gauge(
|
|
'storage_total_bytes',
|
|
'Total storage space in bytes',
|
|
['backend']
|
|
)
|
|
|
|
STORAGE_FILES_TOTAL = Gauge(
|
|
'storage_files_total',
|
|
'Total number of stored files',
|
|
['backend']
|
|
)
|
|
|
|
# Blockchain metrics
|
|
BLOCKCHAIN_TRANSACTIONS_TOTAL = Counter(
|
|
'blockchain_transactions_total',
|
|
'Total blockchain transactions',
|
|
['status', 'network']
|
|
)
|
|
|
|
BLOCKCHAIN_TRANSACTION_FEES = Histogram(
|
|
'blockchain_transaction_fees',
|
|
'Blockchain transaction fees',
|
|
['network']
|
|
)
|
|
|
|
BLOCKCHAIN_PENDING_TRANSACTIONS = Gauge(
|
|
'blockchain_pending_transactions',
|
|
'Number of pending blockchain transactions'
|
|
)
|
|
|
|
BLOCKCHAIN_WALLET_BALANCES = Gauge(
|
|
'blockchain_wallet_balances',
|
|
'Wallet balances',
|
|
['wallet_id', 'currency']
|
|
)
|
|
|
|
TON_SERVICE_UP = Gauge(
|
|
'ton_service_up',
|
|
'TON service availability (1 = up, 0 = down)'
|
|
)
|
|
|
|
# Security metrics
|
|
RATE_LIMIT_HITS_TOTAL = Counter(
|
|
'rate_limit_hits_total',
|
|
'Total rate limit hits',
|
|
['endpoint', 'user_id']
|
|
)
|
|
|
|
SECURITY_EVENTS_TOTAL = Counter(
|
|
'security_events_total',
|
|
'Total security events',
|
|
['event_type', 'severity']
|
|
)
|
|
|
|
SECURITY_SUSPICIOUS_EVENTS = Gauge(
|
|
'security_suspicious_events',
|
|
'Number of suspicious security events in the last hour'
|
|
)
|
|
|
|
FAILED_LOGIN_ATTEMPTS = Counter(
|
|
'failed_login_attempts_total',
|
|
'Total failed login attempts',
|
|
['ip_address', 'reason']
|
|
)
|
|
|
|
# System metrics
|
|
SYSTEM_UPTIME_SECONDS = Gauge(
|
|
'system_uptime_seconds',
|
|
'System uptime in seconds'
|
|
)
|
|
|
|
BACKGROUND_TASKS_ACTIVE = Gauge(
|
|
'background_tasks_active',
|
|
'Number of active background tasks',
|
|
['service']
|
|
)
|
|
|
|
BACKGROUND_TASKS_COMPLETED = Counter(
|
|
'background_tasks_completed_total',
|
|
'Total completed background tasks',
|
|
['service', 'status']
|
|
)
|
|
|
|
# Error metrics
|
|
ERROR_RATE = Gauge(
|
|
'error_rate',
|
|
'Application error rate'
|
|
)
|
|
|
|
EXCEPTIONS_TOTAL = Counter(
|
|
'exceptions_total',
|
|
'Total exceptions',
|
|
['exception_type', 'handler']
|
|
)
|
|
|
|
|
|
class MetricsCollector:
|
|
"""Centralized metrics collection and management."""
|
|
|
|
def __init__(self):
|
|
self.start_time = time.time()
|
|
self._cache_stats = {
|
|
'hits': 0,
|
|
'misses': 0,
|
|
'operations': 0
|
|
}
|
|
|
|
def record_http_request(
|
|
self,
|
|
method: str,
|
|
endpoint: str,
|
|
status_code: int,
|
|
duration: float,
|
|
request_size: int = 0,
|
|
response_size: int = 0
|
|
):
|
|
"""Record HTTP request metrics."""
|
|
HTTP_REQUESTS_TOTAL.labels(
|
|
method=method,
|
|
endpoint=endpoint,
|
|
status_code=status_code
|
|
).inc()
|
|
|
|
HTTP_REQUEST_DURATION.labels(
|
|
method=method,
|
|
endpoint=endpoint
|
|
).observe(duration)
|
|
|
|
if request_size > 0:
|
|
HTTP_REQUEST_SIZE.labels(
|
|
method=method,
|
|
endpoint=endpoint
|
|
).observe(request_size)
|
|
|
|
if response_size > 0:
|
|
HTTP_RESPONSE_SIZE.labels(
|
|
method=method,
|
|
endpoint=endpoint
|
|
).observe(response_size)
|
|
|
|
def record_auth_event(self, event_type: str, status: str, **labels):
|
|
"""Record authentication events."""
|
|
if event_type == 'login':
|
|
AUTH_LOGIN_ATTEMPTS_TOTAL.labels(status=status).inc()
|
|
if status == 'failed':
|
|
reason = labels.get('reason', 'unknown')
|
|
AUTH_LOGIN_FAILURES_TOTAL.labels(reason=reason).inc()
|
|
elif event_type == 'api_key':
|
|
key_id = labels.get('key_id', 'unknown')
|
|
AUTH_API_KEY_USAGE_TOTAL.labels(key_id=key_id, status=status).inc()
|
|
|
|
def record_upload_event(
|
|
self,
|
|
status: str,
|
|
file_type: str,
|
|
file_size: int = 0,
|
|
duration: float = 0,
|
|
**kwargs
|
|
):
|
|
"""Record file upload events."""
|
|
UPLOAD_REQUESTS_TOTAL.labels(status=status, file_type=file_type).inc()
|
|
|
|
if file_size > 0:
|
|
UPLOAD_SIZE_BYTES.labels(file_type=file_type).observe(file_size)
|
|
|
|
if duration > 0:
|
|
UPLOAD_DURATION_SECONDS.labels(file_type=file_type).observe(duration)
|
|
|
|
if status == 'failed':
|
|
reason = kwargs.get('reason', 'unknown')
|
|
UPLOAD_FAILURES_TOTAL.labels(reason=reason, file_type=file_type).inc()
|
|
|
|
def record_processing_event(
|
|
self,
|
|
file_type: str,
|
|
operation: str,
|
|
duration: float = 0,
|
|
status: str = 'success'
|
|
):
|
|
"""Record file processing events."""
|
|
if duration > 0:
|
|
PROCESSING_DURATION_SECONDS.labels(
|
|
file_type=file_type,
|
|
operation=operation
|
|
).observe(duration)
|
|
|
|
if status == 'failed':
|
|
PROCESSING_FAILURES_TOTAL.labels(
|
|
file_type=file_type,
|
|
operation=operation
|
|
).inc()
|
|
|
|
def record_db_event(self, operation: str, duration: float = 0, status: str = 'success'):
|
|
"""Record database events."""
|
|
if duration > 0:
|
|
DB_QUERY_DURATION_SECONDS.labels(operation=operation).observe(duration)
|
|
|
|
DB_TRANSACTIONS_TOTAL.labels(status=status).inc()
|
|
|
|
def record_cache_event(self, operation: str, status: str):
|
|
"""Record cache events."""
|
|
CACHE_OPERATIONS_TOTAL.labels(operation=operation, status=status).inc()
|
|
|
|
# Update cache stats
|
|
self._cache_stats['operations'] += 1
|
|
if status == 'hit':
|
|
self._cache_stats['hits'] += 1
|
|
elif status == 'miss':
|
|
self._cache_stats['misses'] += 1
|
|
|
|
# Update hit ratio
|
|
if self._cache_stats['operations'] > 0:
|
|
hit_ratio = self._cache_stats['hits'] / self._cache_stats['operations']
|
|
CACHE_HIT_RATIO.set(hit_ratio)
|
|
|
|
def record_blockchain_event(
|
|
self,
|
|
event_type: str,
|
|
status: str,
|
|
network: str = 'mainnet',
|
|
**kwargs
|
|
):
|
|
"""Record blockchain events."""
|
|
if event_type == 'transaction':
|
|
BLOCKCHAIN_TRANSACTIONS_TOTAL.labels(status=status, network=network).inc()
|
|
|
|
if 'fee' in kwargs:
|
|
BLOCKCHAIN_TRANSACTION_FEES.labels(network=network).observe(kwargs['fee'])
|
|
|
|
def record_security_event(self, event_type: str, severity: str = 'info', **kwargs):
|
|
"""Record security events."""
|
|
SECURITY_EVENTS_TOTAL.labels(event_type=event_type, severity=severity).inc()
|
|
|
|
if event_type == 'rate_limit':
|
|
endpoint = kwargs.get('endpoint', 'unknown')
|
|
user_id = kwargs.get('user_id', 'anonymous')
|
|
RATE_LIMIT_HITS_TOTAL.labels(endpoint=endpoint, user_id=user_id).inc()
|
|
|
|
elif event_type == 'failed_login':
|
|
ip_address = kwargs.get('ip_address', 'unknown')
|
|
reason = kwargs.get('reason', 'unknown')
|
|
FAILED_LOGIN_ATTEMPTS.labels(ip_address=ip_address, reason=reason).inc()
|
|
|
|
def update_system_metrics(self):
|
|
"""Update system-level metrics."""
|
|
uptime = time.time() - self.start_time
|
|
SYSTEM_UPTIME_SECONDS.set(uptime)
|
|
|
|
def update_gauge_metrics(self, metrics_data: Dict[str, Any]):
|
|
"""Update gauge metrics from external data."""
|
|
# Database metrics
|
|
if 'db_connections' in metrics_data:
|
|
db_conn = metrics_data['db_connections']
|
|
DB_CONNECTIONS_ACTIVE.set(db_conn.get('active', 0))
|
|
DB_CONNECTIONS_IDLE.set(db_conn.get('idle', 0))
|
|
|
|
# Cache metrics
|
|
if 'cache' in metrics_data:
|
|
cache_data = metrics_data['cache']
|
|
CACHE_KEYS_TOTAL.set(cache_data.get('keys', 0))
|
|
CACHE_MEMORY_USAGE_BYTES.set(cache_data.get('memory_usage', 0))
|
|
|
|
# Storage metrics
|
|
if 'storage' in metrics_data:
|
|
storage_data = metrics_data['storage']
|
|
for backend, data in storage_data.items():
|
|
STORAGE_AVAILABLE_BYTES.labels(backend=backend).set(data.get('available', 0))
|
|
STORAGE_TOTAL_BYTES.labels(backend=backend).set(data.get('total', 0))
|
|
STORAGE_FILES_TOTAL.labels(backend=backend).set(data.get('files', 0))
|
|
|
|
# Queue metrics
|
|
if 'queues' in metrics_data:
|
|
queues = metrics_data['queues']
|
|
UPLOAD_QUEUE_SIZE.set(queues.get('upload', 0))
|
|
PROCESSING_QUEUE_SIZE.set(queues.get('processing', 0))
|
|
|
|
# Blockchain metrics
|
|
if 'blockchain' in metrics_data:
|
|
blockchain_data = metrics_data['blockchain']
|
|
BLOCKCHAIN_PENDING_TRANSACTIONS.set(blockchain_data.get('pending_transactions', 0))
|
|
TON_SERVICE_UP.set(1 if blockchain_data.get('ton_service_up') else 0)
|
|
|
|
# Wallet balances
|
|
for wallet_id, balance_data in blockchain_data.get('wallet_balances', {}).items():
|
|
for currency, balance in balance_data.items():
|
|
BLOCKCHAIN_WALLET_BALANCES.labels(
|
|
wallet_id=wallet_id,
|
|
currency=currency
|
|
).set(balance)
|
|
|
|
# Background tasks
|
|
if 'background_tasks' in metrics_data:
|
|
tasks_data = metrics_data['background_tasks']
|
|
for service, count in tasks_data.items():
|
|
BACKGROUND_TASKS_ACTIVE.labels(service=service).set(count)
|
|
|
|
|
|
# Global metrics collector instance
|
|
metrics_collector = MetricsCollector()
|
|
|
|
|
|
def metrics_middleware(request: Request, response: Response):
|
|
"""Middleware to collect HTTP metrics."""
|
|
start_time = time.time()
|
|
|
|
# After request processing
|
|
duration = time.time() - start_time
|
|
|
|
# Get endpoint info
|
|
endpoint = request.path
|
|
method = request.method
|
|
status_code = response.status
|
|
|
|
# Get request/response sizes
|
|
request_size = len(request.body) if request.body else 0
|
|
response_size = len(response.body) if hasattr(response, 'body') and response.body else 0
|
|
|
|
# Record metrics
|
|
metrics_collector.record_http_request(
|
|
method=method,
|
|
endpoint=endpoint,
|
|
status_code=status_code,
|
|
duration=duration,
|
|
request_size=request_size,
|
|
response_size=response_size
|
|
)
|
|
|
|
|
|
def track_function_calls(func_name: str, labels: Optional[Dict[str, str]] = None):
|
|
"""Decorator to track function call metrics."""
|
|
def decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
async def async_wrapper(*args, **kwargs):
|
|
start_time = time.time()
|
|
status = 'success'
|
|
|
|
try:
|
|
result = await func(*args, **kwargs)
|
|
return result
|
|
except Exception as e:
|
|
status = 'error'
|
|
EXCEPTIONS_TOTAL.labels(
|
|
exception_type=type(e).__name__,
|
|
handler=func_name
|
|
).inc()
|
|
raise
|
|
finally:
|
|
duration = time.time() - start_time
|
|
# Record custom metrics based on function type
|
|
if func_name.startswith('db_'):
|
|
metrics_collector.record_db_event(func_name, duration, status)
|
|
elif func_name.startswith('cache_'):
|
|
metrics_collector.record_cache_event(func_name, status)
|
|
|
|
@wraps(func)
|
|
def sync_wrapper(*args, **kwargs):
|
|
start_time = time.time()
|
|
status = 'success'
|
|
|
|
try:
|
|
result = func(*args, **kwargs)
|
|
return result
|
|
except Exception as e:
|
|
status = 'error'
|
|
EXCEPTIONS_TOTAL.labels(
|
|
exception_type=type(e).__name__,
|
|
handler=func_name
|
|
).inc()
|
|
raise
|
|
finally:
|
|
duration = time.time() - start_time
|
|
# Record custom metrics based on function type
|
|
if func_name.startswith('db_'):
|
|
metrics_collector.record_db_event(func_name, duration, status)
|
|
elif func_name.startswith('cache_'):
|
|
metrics_collector.record_cache_event(func_name, status)
|
|
|
|
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
async def get_metrics():
|
|
"""Get Prometheus metrics."""
|
|
# Update system metrics before generating output
|
|
metrics_collector.update_system_metrics()
|
|
|
|
# Generate metrics in Prometheus format
|
|
return generate_latest()
|
|
|
|
|
|
def get_metrics_content_type():
|
|
"""Get the content type for metrics."""
|
|
return CONTENT_TYPE_LATEST |