from __future__ import annotations try: from prometheus_client import Counter, Gauge # type: ignore except Exception: class _Metric: def __init__(self, *_, **__): self._values = {} def labels(self, **kwargs): key = tuple(sorted(kwargs.items())) class H: def __init__(self, parent, k): self._p = parent; self._k = k def inc(self, v: float = 1.0): self._p._values[self._k] = self._p._values.get(self._k, 0.0) + v def set(self, v: float): self._p._values[self._k] = v return H(self, key) class Counter(_Metric): pass class Gauge(_Metric): pass replication_under = Counter("dht_replication_under", "Times replication fell below target", ["content_id"]) replication_over = Counter("dht_replication_over", "Times replication exceeded target", ["content_id"]) leader_changes = Counter("dht_leader_changes_total", "Count of leader changes per content", ["content_id"]) merge_conflicts = Counter("dht_merge_conflicts_total", "Number of DHT merge conflicts", ["key"]) view_count_total = Gauge("dht_view_count_total", "Total content views per window", ["content_id", "window"]) unique_estimate = Gauge("dht_unique_view_estimate", "Estimated unique viewers per window", ["content_id", "window"]) watch_time_seconds = Gauge("dht_watch_time_seconds", "Aggregate watch time per window", ["content_id", "window"]) gossip_success = Counter("dht_gossip_success_total", "Successful gossip posts", ["peer"]) gossip_failure = Counter("dht_gossip_failure_total", "Failed gossip posts", ["peer"]) gossip_skipped = Counter("dht_gossip_skipped_total", "Skipped gossip posts due to backoff", ["peer", "reason"]) gossip_backoff = Gauge("dht_gossip_backoff_seconds", "Gossip backoff seconds remaining", ["peer"]) def record_replication_under(content_id: str, have: int) -> None: replication_under.labels(content_id=content_id).inc() def record_replication_over(content_id: str, have: int) -> None: replication_over.labels(content_id=content_id).inc() def record_leader_change(content_id: str) -> None: leader_changes.labels(content_id=content_id).inc() def record_merge_conflict(key: str) -> None: merge_conflicts.labels(key=key).inc() def update_view_metrics(content_id: str, window_id: str, views: int, unique: float, watch_time: int) -> None: view_count_total.labels(content_id=content_id, window=window_id).set(views) unique_estimate.labels(content_id=content_id, window=window_id).set(unique) watch_time_seconds.labels(content_id=content_id, window=window_id).set(watch_time) def record_gossip_success(peer: str) -> None: gossip_success.labels(peer=peer).inc() gossip_backoff.labels(peer=peer).set(0) def record_gossip_failure(peer: str, backoff_sec: float) -> None: gossip_failure.labels(peer=peer).inc() gossip_backoff.labels(peer=peer).set(backoff_sec) def record_gossip_skipped(peer: str, reason: str) -> None: gossip_skipped.labels(peer=peer, reason=reason).inc()