uploader-bot/app/core/network/dht/metrics.py

145 lines
5.0 KiB
Python

from __future__ import annotations
import time
from dataclasses import dataclass
from typing import Dict, Any, Optional
from app.core._crypto.signer import Signer
from .config import dht_config
from .crdt import PNCounter, GCounter, HyperLogLog
from .crypto import compute_view_id
from .keys import MetricKey
from .store import DHTStore
from .prometheus import update_view_metrics
@dataclass
class MetricDelta:
content_id: str
view_id: str
watch_time: int
bytes_out: int
completed: bool
timestamp: float
def as_dict(self) -> Dict[str, Any]:
return {
"content_id": self.content_id,
"view_id": self.view_id,
"watch_time": self.watch_time,
"bytes_out": self.bytes_out,
"completed": self.completed,
"timestamp": self.timestamp,
}
class ContentMetricsState:
def __init__(self, node_id: str):
self.node_id = node_id
self.views = PNCounter()
self.unique = HyperLogLog()
self.watch_time = GCounter()
self.bytes_out = GCounter()
self.completions = GCounter()
self.logical_counter = 0
def apply(self, delta: MetricDelta) -> None:
self.logical_counter += 1
self.views.increment(self.node_id, 1)
self.unique.add(delta.view_id)
if delta.watch_time:
self.watch_time.increment(self.node_id, delta.watch_time)
if delta.bytes_out:
self.bytes_out.increment(self.node_id, delta.bytes_out)
if delta.completed:
self.completions.increment(self.node_id, 1)
def merge(self, other: "ContentMetricsState") -> "ContentMetricsState":
self.views.merge(other.views)
self.unique.merge(other.unique)
self.watch_time.merge(other.watch_time)
self.bytes_out.merge(other.bytes_out)
self.completions.merge(other.completions)
self.logical_counter = max(self.logical_counter, other.logical_counter)
return self
def to_dict(self) -> Dict[str, Any]:
return {
"views": self.views.to_dict(),
"unique": self.unique.to_dict(),
"watch_time": self.watch_time.to_dict(),
"bytes_out": self.bytes_out.to_dict(),
"completions": self.completions.to_dict(),
"logical_counter": self.logical_counter,
}
@classmethod
def from_dict(cls, node_id: str, data: Dict[str, Any]) -> "ContentMetricsState":
inst = cls(node_id=node_id)
if data:
inst.views = PNCounter.from_dict(data.get("views") or {})
inst.unique = HyperLogLog.from_dict(data.get("unique") or {})
inst.watch_time = GCounter.from_dict(data.get("watch_time") or {})
inst.bytes_out = GCounter.from_dict(data.get("bytes_out") or {})
inst.completions = GCounter.from_dict(data.get("completions") or {})
inst.logical_counter = int(data.get("logical_counter") or 0)
return inst
class MetricsAggregator:
def __init__(self, node_id: str, signer: Signer, store: DHTStore):
self.node_id = node_id
self.signer = signer
self.store = store
def _load(self, content_id: str, window_id: str) -> ContentMetricsState:
key = MetricKey(content_id=content_id, window_id=window_id)
record = self.store.get(key.fingerprint())
if record:
return ContentMetricsState.from_dict(self.node_id, record.value)
return ContentMetricsState(node_id=self.node_id)
def _persist(self, content_id: str, window_id: str, state: ContentMetricsState) -> None:
key = MetricKey(content_id=content_id, window_id=window_id)
self.store.put(
key=str(key),
fingerprint=key.fingerprint(),
value=state.to_dict(),
logical_counter=state.logical_counter,
merge_strategy=lambda a, b: ContentMetricsState.from_dict(self.node_id, a)
.merge(ContentMetricsState.from_dict(self.node_id, b))
.to_dict(),
)
update_view_metrics(
content_id=content_id,
window_id=window_id,
views=state.views.value(),
unique=state.unique.estimate(),
watch_time=state.watch_time.value(),
)
def record_view(
self,
content_id: str,
viewer_salt: bytes,
watch_time: int,
bytes_out: int,
completed: bool,
timestamp: Optional[float] = None,
) -> MetricDelta:
ts = time.time() if timestamp is None else timestamp
window_id = MetricKey.window_for(ts)
view_id = compute_view_id(content_id, viewer_salt)
state = self._load(content_id, window_id)
delta = MetricDelta(
content_id=content_id,
view_id=view_id,
watch_time=watch_time,
bytes_out=bytes_out,
completed=completed,
timestamp=ts,
)
state.apply(delta)
self._persist(content_id, window_id, state)
return delta