from __future__ import annotations import math import time from dataclasses import dataclass, field from typing import Dict, Any, List, Optional, Tuple from app.core._crypto.signer import Signer from .config import dht_config from .crypto import bits_from_hex, rendezvous_score from .keys import MetaKey from .membership import MembershipState from .prometheus import record_replication_under, record_replication_over, record_leader_change from .store import DHTStore def _now() -> float: return time.time() @dataclass class ReplicaLease: node_id: str lease_id: str issued_at: float expires_at: float asn: Optional[int] ip_first_octet: Optional[int] heartbeat_at: float score: int def renew(self, now: float) -> None: self.heartbeat_at = now self.expires_at = now + dht_config.lease_ttl def is_expired(self, now: float) -> bool: if now >= self.expires_at: return True if now - self.heartbeat_at > dht_config.heartbeat_interval * dht_config.heartbeat_miss_threshold: return True return False def to_dict(self) -> Dict[str, Any]: return { "node_id": self.node_id, "lease_id": self.lease_id, "issued_at": self.issued_at, "expires_at": self.expires_at, "asn": self.asn, "ip_first_octet": self.ip_first_octet, "heartbeat_at": self.heartbeat_at, "score": self.score, } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "ReplicaLease": return cls( node_id=str(data["node_id"]), lease_id=str(data["lease_id"]), issued_at=float(data["issued_at"]), expires_at=float(data["expires_at"]), asn=data.get("asn"), ip_first_octet=data.get("ip_first_octet"), heartbeat_at=float(data.get("heartbeat_at", data.get("issued_at"))), score=int(data.get("score", 0)), ) @dataclass class ReplicationState: content_id: str leases: Dict[str, ReplicaLease] = field(default_factory=dict) leader: Optional[str] = None revision: int = 0 conflict_log: List[Dict[str, Any]] = field(default_factory=list) def prune(self, now: float) -> None: for lease_id, lease in list(self.leases.items()): if lease.is_expired(now): self.conflict_log.append( {"type": "LEASE_EXPIRED", "node_id": lease.node_id, "ts": now} ) del self.leases[lease_id] def assign(self, lease: ReplicaLease) -> None: self.leases[lease.lease_id] = lease self.revision += 1 def remove_node(self, node_id: str, reason: str, timestamp: float) -> None: for lease_id, lease in list(self.leases.items()): if lease.node_id == node_id: del self.leases[lease_id] self.conflict_log.append({"type": reason, "node_id": node_id, "ts": timestamp}) self.revision += 1 def heartbeat(self, node_id: str, now: float) -> bool: found = False for lease in self.leases.values(): if lease.node_id == node_id: lease.renew(now) found = True return found def unique_asn(self) -> int: return len({lease.asn for lease in self.leases.values() if lease.asn is not None}) def unique_octets(self) -> int: return len({lease.ip_first_octet for lease in self.leases.values() if lease.ip_first_octet is not None}) def diversity_satisfied(self) -> bool: if len(self.leases) < dht_config.replication_target: return False if self.unique_asn() < dht_config.min_asn_diversity: return False if self.unique_octets() < dht_config.min_ip_octet_diversity: return False return True def to_dict(self) -> Dict[str, Any]: return { "content_id": self.content_id, "leader": self.leader, "revision": self.revision, "replica_leases": {lease_id: lease.to_dict() for lease_id, lease in self.leases.items()}, "conflict_log": list(self.conflict_log)[-100:], # keep tail } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "ReplicationState": state = cls(content_id=str(data.get("content_id", ""))) state.leader = data.get("leader") state.revision = int(data.get("revision", 0)) leases_raw = data.get("replica_leases") or {} for lease_id, payload in leases_raw.items(): state.leases[lease_id] = ReplicaLease.from_dict(payload) state.conflict_log = list(data.get("conflict_log") or []) return state def merge_with(self, other: "ReplicationState") -> "ReplicationState": combined = _merge_states(self, other) return combined class ReplicationManager: def __init__(self, node_id: str, signer: Signer, store: DHTStore): self.node_id = node_id self.signer = signer self.store = store def _load_state(self, content_id: str) -> ReplicationState: key = MetaKey(content_id=content_id) record = self.store.get(key.fingerprint()) if record: return ReplicationState.from_dict(record.value) return ReplicationState(content_id=content_id) def _persist_state(self, state: ReplicationState) -> None: key = MetaKey(content_id=state.content_id) self.store.put( key=str(key), fingerprint=key.fingerprint(), value=state.to_dict(), logical_counter=int(time.time()), merge_strategy=lambda a, b: ReplicationState.from_dict(a) .merge_with(ReplicationState.from_dict(b)) .to_dict(), ) def ensure_replication(self, content_id: str, membership: MembershipState, now: Optional[float] = None) -> ReplicationState: now = now or _now() state = self._load_state(content_id) n_estimate = max(1.0, membership.n_estimate()) p_value = max(0, round(math.log2(max(n_estimate / dht_config.replication_target, 1.0)))) prefix, _ = bits_from_hex(content_id, p_value) active = membership.active_members(include_islands=True) responsible = [] for member in active: node_prefix, _total = bits_from_hex(member["node_id"], p_value) if node_prefix == prefix: responsible.append(member) if not responsible: responsible = active # fall back to all active nodes responsible.sort(key=lambda item: item["node_id"]) leader_id = responsible[0]["node_id"] if responsible else None previous_leader = state.leader state.leader = leader_id if previous_leader and leader_id and previous_leader != leader_id: record_leader_change(content_id) if leader_id != self.node_id: return state # Only leader mutates state state.prune(now) # evaluate diversity leases_by_node = {lease.node_id: lease for lease in state.leases.values()} if not state.diversity_satisfied(): def rank(members): return sorted( ( ( rendezvous_score(content_id, m["node_id"]), m["node_id"], m.get("asn"), m.get("ip_first_octet"), ) for m in members ), key=lambda item: item[0], ) def assign_with_diversity(candidates): added = 0 # Phase 1: prefer candidates that increase ASN/IP octet diversity for score, node_id, asn, ip_octet in candidates: if node_id in leases_by_node: continue before_asn = state.unique_asn() before_oct = state.unique_octets() if ((asn is not None and before_asn < dht_config.min_asn_diversity) or (ip_octet is not None and before_oct < dht_config.min_ip_octet_diversity)): lease = ReplicaLease( node_id=node_id, lease_id=f"{content_id}:{node_id}", issued_at=now, expires_at=now + dht_config.lease_ttl, asn=asn, ip_first_octet=ip_octet, heartbeat_at=now, score=score, ) state.assign(lease) leases_by_node[node_id] = lease added += 1 if state.diversity_satisfied(): return added # Phase 2: fill by score until target for score, node_id, asn, ip_octet in candidates: if node_id in leases_by_node: continue lease = ReplicaLease( node_id=node_id, lease_id=f"{content_id}:{node_id}", issued_at=now, expires_at=now + dht_config.lease_ttl, asn=asn, ip_first_octet=ip_octet, heartbeat_at=now, score=score, ) state.assign(lease) leases_by_node[node_id] = lease added += 1 if state.diversity_satisfied(): return added return added # First, prefer responsible set assign_with_diversity(rank(responsible)) # If under target, add more from the rest of active nodes if not state.diversity_satisfied(): rest = [m for m in active if m["node_id"] not in {n for _, n, *_ in rank(responsible)}] assign_with_diversity(rank(rest)) # Ensure we do not exceed replication target with duplicates if len(state.leases) > dht_config.replication_target: # Drop lowest scoring leases until target satisfied while preserving diversity criteria sorted_leases = sorted(state.leases.values(), key=lambda lease: lease.score, reverse=True) while len(sorted_leases) > dht_config.replication_target: victim = sorted_leases.pop() # lowest score state.remove_node(victim.node_id, reason="OVER_REPLICATED", timestamp=now) record_replication_over(content_id, len(sorted_leases)) if len(state.leases) < dht_config.replication_target: state.conflict_log.append( {"type": "UNDER_REPLICATED", "ts": now, "have": len(state.leases)} ) record_replication_under(content_id, len(state.leases)) self._persist_state(state) return state def heartbeat(self, content_id: str, node_id: str, now: Optional[float] = None) -> bool: now = now or _now() state = self._load_state(content_id) if state.heartbeat(node_id, now): self._persist_state(state) return True return False def _merge_states(left: ReplicationState, right: ReplicationState) -> ReplicationState: # Combine leases preferring latest expiry lease_map: Dict[str, ReplicaLease] = {} for state in (left, right): for lease_id, lease in state.leases.items(): current = lease_map.get(lease_id) if current is None or lease.expires_at > current.expires_at: lease_map[lease_id] = lease merged = ReplicationState(content_id=left.content_id or right.content_id) merged.leader = min(filter(None, [left.leader, right.leader]), default=None) merged.conflict_log = (left.conflict_log + right.conflict_log)[-100:] merged.leases = lease_map merged.revision = max(left.revision, right.revision) + 1 return merged # Inject helper onto ReplicationState for merge strategy